""" LOGOS Harmonizer (Protocol 42) The Immune System - Resonance-Based Noise Filtering Philosophy: "Don't detect what's bad; only accept what matches the geometry." Standard OS: Needs antivirus database to know what is "bad" LOGOS: Accepts only what matches the Prime Modulo of the domain Rule: If you are in Domain [P], you must be divisible by P. Attack: Corrupted atom with wrong heat is arithmetically rejected. Defense: heat % domain_prime != 0 -> REJECT """ from dataclasses import dataclass from typing import List, Tuple, Dict from logos.mtl.interpreter import MTLInterpreter @dataclass class Atom: """Minimal atom structure for harmonization testing.""" id: int heat: int payload: bytes @property def is_valid(self) -> bool: return self.heat > 0 class Harmonizer: """ The SPCW Resonance Filter. Filters atom streams based on prime domain membership. """ def __init__(self): self.mtl = MTLInterpreter() self.rejection_log = [] def filter_stream(self, stream: List[Atom], domain_prime: int, verbose: bool = True) -> Tuple[List[Atom], List[Atom]]: """ Filter a stream of atoms for domain resonance. Args: stream: List of atoms to filter domain_prime: The prime that defines the domain verbose: Print filtering details Returns: Tuple of (resonant_atoms, dissonant_atoms) """ resonant = [] dissonant = [] if verbose: print(f"\n[HARMONIZER] Filtering for Domain [{domain_prime}]...") for atom in stream: # The Core Test: Does this atom resonate with the domain? if atom.heat % domain_prime == 0: resonant.append(atom) if verbose: print(f" Atom {atom.id} (heat={atom.heat}) -> [OK] RESONANT") else: dissonant.append(atom) self.rejection_log.append({ 'atom_id': atom.id, 'heat': atom.heat, 'domain': domain_prime, 'remainder': atom.heat % domain_prime, 'reason': 'DISSONANCE' }) if verbose: print(f" Atom {atom.id} (heat={atom.heat}) -> [X] DISSONANT (rem={atom.heat % domain_prime})") return resonant, dissonant def multi_domain_filter(self, stream: List[Atom], domain_primes: List[int]) -> Dict[int, List[Atom]]: """ Route atoms to multiple domains based on resonance. An atom can belong to multiple domains if it resonates with all. """ domain_buckets = {p: [] for p in domain_primes} for atom in stream: for prime in domain_primes: if atom.heat % prime == 0: domain_buckets[prime].append(atom) return domain_buckets def inject_noise(self, stream: List[Atom], indices: List[int], corruption_delta: int = 1) -> List[Atom]: """ Artificially corrupt atoms at specified indices. Used for testing the immune system. """ corrupted = [] for i, atom in enumerate(stream): if i in indices: # Corrupt by adding delta (breaks modulo alignment) new_heat = atom.heat + corruption_delta corrupted.append(Atom(atom.id, new_heat, atom.payload)) else: corrupted.append(atom) return corrupted def calculate_stream_integrity(self, original: List[Atom], filtered: List[Atom]) -> float: """Calculate what percentage of original stream survived filtering.""" if not original: return 0.0 return len(filtered) / len(original) def get_rejection_summary(self) -> Dict: """Get summary of all rejections.""" return { 'total_rejections': len(self.rejection_log), 'by_domain': {}, 'log': self.rejection_log } # ============================================ # STANDALONE TEST: The Immune System # ============================================ if __name__ == "__main__": harmonizer = Harmonizer() print("="*60) print(" SPCW HARMONIZATION TEST (The Immune System)") print("="*60) # ===== TEST 1: Basic Resonance Filter ===== print("\n[TEST 1] Basic Resonance Filter") print("-"*40) # Create clean stream of TEXT atoms [19] stream = [] for i in range(5): # Valid Heat: Must be divisible by 19 valid_heat = (i + 1) * 19 atom = Atom(id=i, heat=valid_heat, payload=f"Data_Chunk_{i}".encode()) stream.append(atom) print(f"Original Stream Heats: {[a.heat for a in stream]}") # Inject noise: Corrupt Atom #2 print("\n[INJECTION] Corrupting Atom #2...") corrupted_stream = harmonizer.inject_noise(stream, indices=[2], corruption_delta=1) print(f"Corrupted Stream Heats: {[a.heat for a in corrupted_stream]}") # Filter clean, rejected = harmonizer.filter_stream(corrupted_stream, domain_prime=19) print(f"\n[REPORT]") print(f" Input Size: {len(corrupted_stream)}") print(f" Clean Output: {len(clean)}") print(f" Rejected: {len(rejected)}") if len(rejected) == 1 and rejected[0].id == 2: print("[OK] TEST 1 PASSED: Dissonance filtered mathematically.") else: print("[X] TEST 1 FAILED") # ===== TEST 2: Multi-Modal Corruption ===== print("\n[TEST 2] Multi-Modal Corruption Attack") print("-"*40) # Create stream with multiple types multi_stream = [ Atom(0, 17 * 2, b"image_chunk_0"), # IMAGE [17] * 2 = 34 Atom(1, 17 * 3, b"image_chunk_1"), # IMAGE [17] * 3 = 51 Atom(2, 19 * 2, b"text_chunk_0"), # TEXT [19] * 2 = 38 Atom(3, 19 * 3, b"text_chunk_1"), # TEXT [19] * 3 = 57 Atom(4, 17 * 19, b"multimodal"), # IMAGE + TEXT = 323 ] print(f"Original: {[(a.id, a.heat) for a in multi_stream]}") # Corrupt atoms 1 and 3 attacked_stream = harmonizer.inject_noise(multi_stream, indices=[1, 3], corruption_delta=1) print(f"Attacked: {[(a.id, a.heat) for a in attacked_stream]}") # Filter for IMAGE domain [17] image_clean, image_rejected = harmonizer.filter_stream(attacked_stream, domain_prime=17) print(f"\nIMAGE Domain [17]: {len(image_clean)} clean, {len(image_rejected)} rejected") # Filter for TEXT domain [19] text_clean, text_rejected = harmonizer.filter_stream(attacked_stream, domain_prime=19) print(f"TEXT Domain [19]: {len(text_clean)} clean, {len(text_rejected)} rejected") # ===== TEST 3: Cascading Attack ===== print("\n[TEST 3] Cascading Attack (50% Corruption)") print("-"*40) large_stream = [Atom(i, 7 * (i + 1), f"chunk_{i}".encode()) for i in range(10)] print(f"Original: 10 atoms, all divisible by 7") # Corrupt half heavy_attack = harmonizer.inject_noise(large_stream, indices=[1, 3, 5, 7, 9], corruption_delta=1) clean, rejected = harmonizer.filter_stream(heavy_attack, domain_prime=7, verbose=False) integrity = harmonizer.calculate_stream_integrity(heavy_attack, clean) print(f"Integrity after attack: {integrity*100:.0f}%") print(f"Survivors: {len(clean)}, Rejected: {len(rejected)}") if len(rejected) == 5: print("[OK] TEST 3 PASSED: 50% corruption detected and filtered.") # ===== FINAL SUMMARY ===== print("\n" + "="*60) print(" HARMONIZATION SUMMARY") print("="*60) print(f""" The LOGOS Immune System: | Attack Type | Detection Method | Result | |--------------------|----------------------|-----------| | Single Corruption | heat % 19 != 0 | REJECTED | | Multi-Modal Attack | heat % 17/19 != 0 | REJECTED | | 50% Cascade | heat % 7 != 0 | REJECTED | Principle: "Physics, not Policies" - No virus database needed - No signature matching - Pure arithmetic geometry """) summary = harmonizer.get_rejection_summary() print(f"Total Rejections: {summary['total_rejections']}") print("\n" + "="*60) print(" [OK] IMMUNE SYSTEM OPERATIONAL") print("="*60)