import numpy as np from dataclasses import dataclass, field from typing import Dict, List, Tuple, Optional, Any, Literal from enum import Enum import json from datetime import datetime import logging import matplotlib.pyplot as plt from scipy.special import expit as sigmoid # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # ================================ ENUMS ================================ class FrequencyBand(Enum): DELTA = 'delta' THETA = 'theta' ALPHA = 'alpha' BETA = 'beta' GAMMA = 'gamma' class StreamType(Enum): STREAM_A = "Stream A: Hypo-coherence" STREAM_B = "Stream B: Hyper-coherence" DUAL = "Dual Stream" class SeamType(Enum): TYPE_I = "Type I: Perfect Recovery" TYPE_II = "Type II: Acceptable Loss" TYPE_III = "Type III: Failed Recovery" class SystemMode(Enum): STANDARD = "standard" HIGH_SENSITIVITY = "high_sensitivity" STABILITY = "stability" RECOVERY = "recovery" ADAPTIVE = "adaptive" class ChainState(Enum): HYPO = "hypo-coherent" HYPER = "hyper-coherent" INTACT = "intact" # ================================ DATACLASSES ================================ @dataclass class SpatialPosition: x: float y: float m: int n: int def distance_to(self, other: 'SpatialPosition') -> float: return np.sqrt((self.x - other.x)**2 + (self.y - other.y)**2) def radius(self) -> float: return np.sqrt(self.x**2 + self.y**2) def angle(self) -> float: return np.arctan2(self.y, self.x) @dataclass class ChainComponent: band: FrequencyBand positions: List[SpatialPosition] coherence: float phase_std: float state: ChainState stream: StreamType @dataclass class DualAuditResult: # Stream A metrics delta_kappa_A: float s_A: float # Stream A residual # Stream B metrics delta_kappa_B: float s_B: float # Stream B residual # Composite metrics s_composite: float tau_R: float D_C: float D_omega: float R: float I: float # Results seam_type: SeamType audit_pass: bool active_streams: List[StreamType] details: Dict[str, Any] = field(default_factory=dict) @dataclass class AdaptiveThresholds: tau_low: float tau_high: float tau_phase: float alpha: float stress: float mode: SystemMode # ================================ CONFIGURATION ================================ class ABCRConfig: """Adaptive Bi-Coupled Coherence Recovery Configuration""" # Spatial grid SPATIAL_GRID_M = 8 SPATIAL_GRID_N = 8 SPATIAL_UNIT = 0.1 PROPAGATION_SPEED = 1.0 # Base thresholds TAU_BASE = 0.3 TAU_PHASE = 0.5 # Mode-specific parameters MODE_PARAMS = { SystemMode.STANDARD: { 'alpha_base': 0.6, 'alpha_mod': 0.1, 'rho': 0.7, 'novelty': 0.1, 'baseline': 0.6 }, SystemMode.HIGH_SENSITIVITY: { 'alpha_base': 0.65, 'alpha_mod': 0.15, 'rho': 0.6, 'novelty': 0.12, 'baseline': 0.65, 'tau_low_factor': 0.8, 'tau_high_factor': 1.2 }, SystemMode.STABILITY: { 'alpha_base': 0.5, 'alpha_mod': 0.05, 'rho': 0.8, 'novelty': 0.05, 'baseline': 0.5 }, SystemMode.RECOVERY: { 'alpha_base': 0.65, 'alpha_mod': 0.15, 'rho': 0.6, 'novelty': 0.15, 'baseline': 0.7 } } # Coupling parameters LAMBDA_CROSS_STREAM = 0.3 # Cross-stream coupling coefficient # Audit tolerances AUDIT_TOLERANCE = 0.01 TYPE_I_THRESHOLD = 1e-6 # Emergency thresholds EMERGENCY_HYPO_THRESHOLD = 0.1 EMERGENCY_HYPER_THRESHOLD = 0.9 # Convergence parameters MAX_RECONSTRUCTION_ITERATIONS = 100 CONVERGENCE_TOLERANCE = 0.001 # Frequency band center frequencies (Hz) BAND_FREQUENCIES = { FrequencyBand.DELTA: 2.0, FrequencyBand.THETA: 6.0, FrequencyBand.ALPHA: 10.0, FrequencyBand.BETA: 20.0, FrequencyBand.GAMMA: 40.0 } # ================================ CORE COMPONENTS ================================ class DualStreamEncoder: """Encodes states into forward and mirror capsules for bi-directional processing""" def __init__(self, M: int = ABCRConfig.SPATIAL_GRID_M, N: int = ABCRConfig.SPATIAL_GRID_N): self.M = M self.N = N self.positions = self._initialize_positions() self.spatial_cache = {} def _initialize_positions(self) -> List[SpatialPosition]: positions = [] for m in range(-self.M, self.M + 1): for n in range(-self.N, self.N + 1): x = m * ABCRConfig.SPATIAL_UNIT y = n * ABCRConfig.SPATIAL_UNIT positions.append(SpatialPosition(x, y, m, n)) return positions def encode_forward_capsule(self, kappa_bands: Dict[FrequencyBand, float], phi_bands: Dict[FrequencyBand, float]) -> np.ndarray: """Standard forward encoding: C_F[m,n,b] = G_r * κ_b * exp(iφ_b - ik_b*r)""" num_bands = len(FrequencyBand) capsule = np.zeros((2*self.M+1, 2*self.N+1, num_bands), dtype=complex) for pos in self.positions: r = pos.radius() G_r = np.exp(-r / (self.M * ABCRConfig.SPATIAL_UNIT)) for b_idx, band in enumerate(FrequencyBand): omega_b = ABCRConfig.BAND_FREQUENCIES[band] kappa_b = kappa_bands[band] phi_b = phi_bands[band] k_b = 2 * np.pi * omega_b / ABCRConfig.PROPAGATION_SPEED phase_shift = k_b * r total_phase = phi_b - phase_shift capsule[pos.m + self.M, pos.n + self.N, b_idx] = \ G_r * kappa_b * np.exp(1j * total_phase) return capsule def encode_mirror_capsule(self, kappa_bands: Dict[FrequencyBand, float], phi_bands: Dict[FrequencyBand, float]) -> np.ndarray: """Mirror encoding: C_M[m,n,b] = G_r * (1-κ_b) * exp(i(π-φ_b) + ik_b*r)""" num_bands = len(FrequencyBand) capsule = np.zeros((2*self.M+1, 2*self.N+1, num_bands), dtype=complex) for pos in self.positions: r = pos.radius() G_r = np.exp(-r / (self.M * ABCRConfig.SPATIAL_UNIT)) for b_idx, band in enumerate(FrequencyBand): omega_b = ABCRConfig.BAND_FREQUENCIES[band] kappa_b = kappa_bands[band] phi_b = phi_bands[band] k_b = 2 * np.pi * omega_b / ABCRConfig.PROPAGATION_SPEED phase_shift = k_b * r total_phase = (np.pi - phi_b) + phase_shift capsule[pos.m + self.M, pos.n + self.N, b_idx] = \ G_r * (1 - kappa_b) * np.exp(1j * total_phase) return capsule def compute_spatial_coupling(self, pos1: SpatialPosition, pos2: SpatialPosition, band1: FrequencyBand, band2: FrequencyBand) -> float: """Compute spatial coupling strength between positions and bands""" cache_key = (pos1.m, pos1.n, pos2.m, pos2.n, band1.value, band2.value) if cache_key in self.spatial_cache: return self.spatial_cache[cache_key] distance = pos1.distance_to(pos2) spatial_factor = np.exp(-distance / ABCRConfig.SPATIAL_UNIT) freq_diff = abs(ABCRConfig.BAND_FREQUENCIES[band1] - ABCRConfig.BAND_FREQUENCIES[band2]) freq_factor = np.exp(-freq_diff / 10.0) coupling = spatial_factor * freq_factor self.spatial_cache[cache_key] = coupling return coupling class AdaptiveThresholdManager: """Manages adaptive thresholds based on system stress and mode""" def compute_system_stress(self, kappa: Dict[FrequencyBand, float], kappa_history: List[Dict[FrequencyBand, float]] = None) -> float: """Compute system stress indicator σ(t)""" if kappa_history and len(kappa_history) > 0: # Compute rate of change prev_kappa = kappa_history[-1] dk_dt = {b: abs(kappa[b] - prev_kappa[b]) for b in FrequencyBand} numerator = sum(dk_dt.values()) denominator = sum(kappa.values()) + 1e-10 stress = min(1.0, numerator / denominator) else: # Use deviation from balanced state balanced = 0.5 deviations = [abs(k - balanced) for k in kappa.values()] stress = np.mean(deviations) * 2 return np.clip(stress, 0, 1) def compute_adaptive_thresholds(self, stress: float, mode: SystemMode) -> AdaptiveThresholds: """Compute adaptive thresholds based on stress and mode""" params = ABCRConfig.MODE_PARAMS[mode] # Base thresholds tau_base = ABCRConfig.TAU_BASE # Adaptive adjustments tau_low = tau_base * (1 - stress * 0.3) tau_high = 1 - tau_base * (1 - stress * 0.3) # Mode-specific adjustments if mode == SystemMode.HIGH_SENSITIVITY: tau_low *= params.get('tau_low_factor', 0.8) tau_high *= params.get('tau_high_factor', 1.2) elif mode == SystemMode.STABILITY: tau_low *= 1.1 tau_high *= 0.9 # Adaptive alpha alpha = params['alpha_base'] + params['alpha_mod'] * stress # Phase coherence threshold (less adaptive) tau_phase = ABCRConfig.TAU_PHASE * (1 + stress * 0.1) return AdaptiveThresholds( tau_low=np.clip(tau_low, 0.1, 0.5), tau_high=np.clip(tau_high, 0.5, 0.9), tau_phase=tau_phase, alpha=np.clip(alpha, 0.3, 0.8), stress=stress, mode=mode ) class DualStreamProcessor: """Processes both hypo and hyper coherence streams""" def __init__(self, encoder: DualStreamEncoder): self.encoder = encoder self.embedding_cache = {} def detect_broken_chains(self, kappa: Dict[FrequencyBand, float], C_F: np.ndarray, C_M: np.ndarray, thresholds: AdaptiveThresholds) -> Tuple[List[ChainComponent], List[ChainComponent], Dict[FrequencyBand, float]]: """Detect broken chains in both streams""" broken_A = [] # Hypo-coherent chains broken_B = [] # Hyper-coherent chains intact = {} for b_idx, band in enumerate(FrequencyBand): kappa_b = kappa[band] if kappa_b < thresholds.tau_low: # Hypo-coherent: use forward capsule phase_coh = self._compute_phase_coherence(C_F, band, b_idx) if phase_coh < thresholds.tau_phase: component = ChainComponent( band=band, positions=self._get_significant_positions(C_F, b_idx), coherence=kappa_b, phase_std=np.sqrt(1 - phase_coh), state=ChainState.HYPO, stream=StreamType.STREAM_A ) broken_A.append(component) logger.info(f"Stream A - Hypo-coherent chain: {band.value} " f"(κ={kappa_b:.3f}, phase_coh={phase_coh:.3f})") else: intact[band] = kappa_b elif kappa_b > thresholds.tau_high: # Hyper-coherent: use mirror capsule phase_coh = self._compute_phase_coherence(C_M, band, b_idx) if phase_coh < thresholds.tau_phase: component = ChainComponent( band=band, positions=self._get_significant_positions(C_M, b_idx), coherence=kappa_b, phase_std=np.sqrt(1 - phase_coh), state=ChainState.HYPER, stream=StreamType.STREAM_B ) broken_B.append(component) logger.info(f"Stream B - Hyper-coherent chain: {band.value} " f"(κ={kappa_b:.3f}, phase_coh={phase_coh:.3f})") else: intact[band] = kappa_b else: intact[band] = kappa_b return broken_A, broken_B, intact def _compute_phase_coherence(self, capsule: np.ndarray, band: FrequencyBand, b_idx: int) -> float: """Compute phase coherence for a band""" phases = [] for pos in self.encoder.positions: value = capsule[pos.m + self.encoder.M, pos.n + self.encoder.N, b_idx] if np.abs(value) > 1e-6: phases.append(np.angle(value)) if len(phases) < 2: return 0.0 # Compute circular variance mean_vector = np.mean(np.exp(1j * np.array(phases))) coherence = np.abs(mean_vector) return coherence def _get_significant_positions(self, capsule: np.ndarray, b_idx: int, threshold: float = 1e-3) -> List[SpatialPosition]: """Get positions with significant amplitude""" positions = [] for pos in self.encoder.positions: amplitude = np.abs(capsule[pos.m + self.encoder.M, pos.n + self.encoder.N, b_idx]) if amplitude > threshold: positions.append(pos) return positions class BiCoupledReconstructor: """Reconstructs coherence using bi-coupled Hamiltonians""" def __init__(self, encoder: DualStreamEncoder): self.encoder = encoder self.H_A = {} # Stream A Hamiltonians self.H_B = {} # Stream B Hamiltonians def compute_hamiltonians(self, broken_A: List[ChainComponent], broken_B: List[ChainComponent], intact: Dict[FrequencyBand, float], C_F: np.ndarray, C_M: np.ndarray): """Compute bi-coupled Hamiltonians for both streams""" # Stream A Hamiltonians (for hypo-coherent chains) for component in broken_A: band = component.band b_idx = list(FrequencyBand).index(band) # Forward field bias h_F = 0 for pos in component.positions: h_F += C_F[pos.m + self.encoder.M, pos.n + self.encoder.N, b_idx] # Mirror field contribution h_M = 0 for pos in component.positions: h_M += C_M[pos.m + self.encoder.M, pos.n + self.encoder.N, b_idx] # Coupling to intact bands J_coupling = 0 for intact_band, intact_val in intact.items(): for pos1 in component.positions: for pos2 in self.encoder.positions: coupling = self.encoder.compute_spatial_coupling( pos1, pos2, band, intact_band ) J_coupling += coupling * intact_val # Bi-coupled Hamiltonian for Stream A self.H_A[band] = h_F + J_coupling + ABCRConfig.LAMBDA_CROSS_STREAM * h_M # Stream B Hamiltonians (for hyper-coherent chains) for component in broken_B: band = component.band b_idx = list(FrequencyBand).index(band) # Mirror field bias h_M = 0 for pos in component.positions: h_M += C_M[pos.m + self.encoder.M, pos.n + self.encoder.N, b_idx] # Forward field contribution h_F = 0 for pos in component.positions: h_F += C_F[pos.m + self.encoder.M, pos.n + self.encoder.N, b_idx] # Coupling to intact bands (inverted) J_coupling = 0 for intact_band, intact_val in intact.items(): for pos1 in component.positions: for pos2 in self.encoder.positions: coupling = self.encoder.compute_spatial_coupling( pos1, pos2, band, intact_band ) J_coupling += coupling * (1 - intact_val) # Bi-coupled Hamiltonian for Stream B self.H_B[band] = h_M + J_coupling + ABCRConfig.LAMBDA_CROSS_STREAM * h_F def reconstruct(self, broken_A: List[ChainComponent], broken_B: List[ChainComponent], intact: Dict[FrequencyBand, float]) -> Dict[FrequencyBand, float]: """Iteratively reconstruct broken chains using bi-coupled fields""" kappa_recon = intact.copy() # Initialize broken bands for component in broken_A: kappa_recon[component.band] = 0.3 # Start from low coherence for component in broken_B: kappa_recon[component.band] = 0.7 # Start from high coherence # Iterative reconstruction for iteration in range(ABCRConfig.MAX_RECONSTRUCTION_ITERATIONS): converged = True # Reconstruct hypo-coherent chains for component in broken_A: band = component.band field = np.abs(self.H_A.get(band, 0)) # Sigmoid activation for Stream A kappa_new = sigmoid(field) if abs(kappa_new - kappa_recon[band]) > ABCRConfig.CONVERGENCE_TOLERANCE: converged = False kappa_recon[band] = kappa_new # Reconstruct hyper-coherent chains for component in broken_B: band = component.band field = np.abs(self.H_B.get(band, 0)) # Inverted sigmoid for Stream B kappa_new = 1 - sigmoid(field) if abs(kappa_new - kappa_recon[band]) > ABCRConfig.CONVERGENCE_TOLERANCE: converged = False kappa_recon[band] = kappa_new if converged: logger.info(f"Reconstruction converged in {iteration+1} iterations") break return kappa_recon class DualStreamAuditor: """Performs integrity audit across both processing streams""" def audit(self, kappa_original: Dict[FrequencyBand, float], kappa_recon: Dict[FrequencyBand, float], broken_A: List[ChainComponent], broken_B: List[ChainComponent], timestamp_original: float, timestamp_recon: float) -> DualAuditResult: """Perform dual-stream integrity audit""" # Compute per-stream metrics delta_kappa_A = self._compute_stream_delta(kappa_original, kappa_recon, broken_A) delta_kappa_B = self._compute_stream_delta(kappa_original, kappa_recon, broken_B) # Common metrics tau_R = abs(timestamp_recon - timestamp_original) D_C = self._compute_curvature_change(kappa_original, kappa_recon) D_omega = self._compute_entropy_drift(kappa_original, kappa_recon) R = self._compute_return_credit(kappa_original, kappa_recon) # Stream residuals s_A = R * tau_R - (delta_kappa_A + D_omega + D_C) if broken_A else 0 s_B = R * tau_R - (delta_kappa_B + D_omega + D_C) if broken_B else 0 # Composite score with weighting active_streams = [] if broken_A: active_streams.append(StreamType.STREAM_A) if broken_B: active_streams.append(StreamType.STREAM_B) if broken_A and broken_B: w_A = len(broken_A) / (len(broken_A) + len(broken_B)) w_B = len(broken_B) / (len(broken_A) + len(broken_B)) s_composite = w_A * s_A + w_B * s_B elif broken_A: s_composite = s_A elif broken_B: s_composite = s_B else: s_composite = 0 # Determine seam type delta_kappa_avg = np.mean([kappa_recon[b] - kappa_original[b] for b in FrequencyBand]) if abs(s_composite) < ABCRConfig.AUDIT_TOLERANCE: if abs(delta_kappa_avg) < ABCRConfig.TYPE_I_THRESHOLD: seam_type = SeamType.TYPE_I else: seam_type = SeamType.TYPE_II audit_pass = True else: seam_type = SeamType.TYPE_III audit_pass = False # Compute integrity index kappa_final = np.mean(list(kappa_recon.values())) I = np.exp(kappa_final) result = DualAuditResult( delta_kappa_A=delta_kappa_A, s_A=s_A, delta_kappa_B=delta_kappa_B, s_B=s_B, s_composite=s_composite, tau_R=tau_R, D_C=D_C, D_omega=D_omega, R=R, I=I, seam_type=seam_type, audit_pass=audit_pass, active_streams=active_streams, details={ 'broken_A_count': len(broken_A), 'broken_B_count': len(broken_B), 'delta_kappa_avg': delta_kappa_avg } ) logger.info(f"Dual-stream audit: {seam_type.value}, " f"s_composite={s_composite:.6f}, pass={audit_pass}") return result def _compute_stream_delta(self, original: Dict[FrequencyBand, float], recon: Dict[FrequencyBand, float], broken: List[ChainComponent]) -> float: """Compute average delta for a specific stream""" if not broken: return 0 deltas = [recon[c.band] - original[c.band] for c in broken] return np.mean(deltas) if deltas else 0 def _compute_curvature_change(self, original: Dict[FrequencyBand, float], recon: Dict[FrequencyBand, float]) -> float: """Compute change in spectral curvature""" original_vals = np.array(list(original.values())) recon_vals = np.array(list(recon.values())) if len(original_vals) >= 3: original_curvature = np.mean(np.abs(np.diff(original_vals, n=2))) recon_curvature = np.mean(np.abs(np.diff(recon_vals, n=2))) return abs(recon_curvature - original_curvature) return 0.0 def _compute_entropy_drift(self, original: Dict[FrequencyBand, float], recon: Dict[FrequencyBand, float]) -> float: """Compute entropy-based drift metric""" errors = [recon[b] - original[b] for b in FrequencyBand] return np.std(errors) def _compute_return_credit(self, original: Dict[FrequencyBand, float], recon: Dict[FrequencyBand, float]) -> float: """Compute return credit based on recovery quality""" ratios = [] for band in FrequencyBand: if original[band] > 0: ratio = recon[band] / original[band] ratio = np.clip(ratio, 0, 2) ratios.append(1 - abs(1 - ratio)) return np.mean(ratios) if ratios else 0.0 class AdaptiveRenewalEngine: """Manages cognitive renewal with mode-dependent strategies""" def __init__(self): self.Pi = None # Invariant field self.renewal_history = [] self.release_history = [] def initialize_invariant_field(self, kappa_initial: Dict[FrequencyBand, float]): """Initialize the invariant field Pi""" self.Pi = kappa_initial.copy() logger.info(f"Invariant field initialized: mean κ = {np.mean(list(self.Pi.values())):.3f}") def update_invariant_field(self, kappa_current: Dict[FrequencyBand, float], beta: float = 0.1): """Update invariant field with exponential moving average""" if self.Pi is None: self.initialize_invariant_field(kappa_current) return for band in FrequencyBand: self.Pi[band] = (1 - beta) * self.Pi[band] + beta * kappa_current[band] def perform_renewal(self, kappa_fragmented: Dict[FrequencyBand, float], mode: SystemMode) -> Dict[FrequencyBand, float]: """Perform adaptive renewal based on mode""" if self.Pi is None: logger.warning("Cannot renew: Pi not initialized") return kappa_fragmented params = ABCRConfig.MODE_PARAMS[mode] rho = params['rho'] novelty = params['novelty'] baseline = params['baseline'] kappa_renewed = {} for band in FrequencyBand: xi = np.random.normal(0, novelty) kappa_renewed[band] = ( rho * self.Pi[band] + (1 - rho) * baseline + xi ) kappa_renewed[band] = np.clip(kappa_renewed[band], 0, 1) self.renewal_history.append({ 'timestamp': datetime.now().isoformat(), 'mode': mode.value, 'kappa_before': kappa_fragmented.copy(), 'kappa_after': kappa_renewed.copy(), 'Pi_state': self.Pi.copy() }) logger.info(f"Renewal ({mode.value}): mean κ before={np.mean(list(kappa_fragmented.values())):.3f}, " f"after={np.mean(list(kappa_renewed.values())):.3f}") return kappa_renewed # ================================ MAIN SYSTEM ================================ class AdaptiveBiCoupledCoherenceSystem: """Main ABCR system orchestrating all components""" def __init__(self, mode: SystemMode = SystemMode.STANDARD): self.mode = mode self.encoder = DualStreamEncoder() self.threshold_manager = AdaptiveThresholdManager() self.processor = DualStreamProcessor(self.encoder) self.reconstructor = BiCoupledReconstructor(self.encoder) self.auditor = DualStreamAuditor() self.renewal_engine = AdaptiveRenewalEngine() self.current_capsules = {'forward': None, 'mirror': None} self.kappa_history = [] self.system_history = [] logger.info(f"ABCR System initialized in {mode.value} mode") def process_coherence_state(self, kappa: Dict[FrequencyBand, float], phi: Dict[FrequencyBand, float], timestamp: float) -> Optional[Dict[FrequencyBand, float]]: """Main processing pipeline""" # Step 1: Compute adaptive thresholds stress = self.threshold_manager.compute_system_stress(kappa, self.kappa_history) thresholds = self.threshold_manager.compute_adaptive_thresholds(stress, self.mode) logger.info(f"System stress: {stress:.3f}, tau_low={thresholds.tau_low:.3f}, " f"tau_high={thresholds.tau_high:.3f}, alpha={thresholds.alpha:.3f}") # Step 2: Encode dual capsules C_F = self.encoder.encode_forward_capsule(kappa, phi) C_M = self.encoder.encode_mirror_capsule(kappa, phi) self.current_capsules = {'forward': C_F, 'mirror': C_M} # Step 3: Detect broken chains broken_A, broken_B, intact = self.processor.detect_broken_chains( kappa, C_F, C_M, thresholds ) # Step 4: Check emergency conditions if self._check_emergency_conditions(kappa, broken_A, broken_B): logger.critical("EMERGENCY DECOUPLE TRIGGERED") return None # Step 5: Handle based on detection results if not broken_A and not broken_B: logger.info("No broken chains detected - state stable") self.kappa_history.append(kappa.copy()) return kappa logger.info("=" * 70) logger.info(f"DUAL-STREAM RECOVERY: {len(broken_A)} hypo, {len(broken_B)} hyper chains") logger.info("=" * 70) # Step 6: Compute bi-coupled Hamiltonians self.reconstructor.compute_hamiltonians(broken_A, broken_B, intact, C_F, C_M) # Step 7: Reconstruct kappa_recon = self.reconstructor.reconstruct(broken_A, broken_B, intact) # Step 8: Audit audit_result = self.auditor.audit( kappa, kappa_recon, broken_A, broken_B, timestamp - 0.1 if self.kappa_history else timestamp, timestamp ) # Step 9: Apply renewal if audit passes if audit_result.audit_pass: kappa_final = self.renewal_engine.perform_renewal(kappa_recon, self.mode) self.renewal_engine.update_invariant_field(kappa_final) self._record_event('successful_recovery', timestamp, kappa, kappa_final, audit_result) self.kappa_history.append(kappa_final.copy()) logger.info(f"✓ Recovery successful: {audit_result.seam_type.value}") logger.info("=" * 70) return kappa_final else: logger.error(f"✗ Audit failed: {audit_result.seam_type.value}") self._record_event('failed_recovery', timestamp, kappa, kappa_recon, audit_result) logger.info("=" * 70) # Fallback recovery return self._fallback_recovery(kappa) def _check_emergency_conditions(self, kappa: Dict[FrequencyBand, float], broken_A: List[ChainComponent], broken_B: List[ChainComponent]) -> bool: """Check for emergency decouple conditions""" min_kappa = min(kappa.values()) max_kappa = max(kappa.values()) if min_kappa < ABCRConfig.EMERGENCY_HYPO_THRESHOLD: logger.critical(f"Emergency: Extreme hypo-coherence (min={min_kappa:.3f})") return True if max_kappa > ABCRConfig.EMERGENCY_HYPER_THRESHOLD: logger.critical(f"Emergency: Extreme hyper-coherence (max={max_kappa:.3f})") return True if len(broken_A) + len(broken_B) >= len(FrequencyBand): logger.critical("Emergency: All bands broken") return True return False def _fallback_recovery(self, kappa: Dict[FrequencyBand, float]) -> Dict[FrequencyBand, float]: """Fallback recovery when audit fails""" if self.renewal_engine.Pi is not None: # Use invariant field as fallback return self.renewal_engine.Pi.copy() else: # Default to balanced state return {band: 0.5 for band in FrequencyBand} def _record_event(self, event_type: str, timestamp: float, kappa_before: Dict[FrequencyBand, float], kappa_after: Dict[FrequencyBand, float], audit_result: DualAuditResult): """Record system event for history""" self.system_history.append({ 'timestamp': timestamp, 'event': event_type, 'kappa_before': kappa_before.copy(), 'kappa_after': kappa_after.copy(), 'audit': { 'seam_type': audit_result.seam_type.value, 's_composite': audit_result.s_composite, 's_A': audit_result.s_A, 's_B': audit_result.s_B, 'active_streams': [s.value for s in audit_result.active_streams] } }) def set_mode(self, mode: SystemMode): """Change operational mode""" self.mode = mode logger.info(f"System mode changed to: {mode.value}") def simulate_dynamics(self, duration: float = 10.0, dt: float = 0.1, scenario: str = "dual_stress") -> List[Dict]: """Simulate coherence dynamics with various scenarios""" time_points = int(duration / dt) coherence_history = [] # Initialize kappa_baseline = { FrequencyBand.DELTA: 0.72, FrequencyBand.THETA: 0.68, FrequencyBand.ALPHA: 0.75, FrequencyBand.BETA: 0.70, FrequencyBand.GAMMA: 0.65 } phi_baseline = { FrequencyBand.DELTA: 0.1, FrequencyBand.THETA: 0.3, FrequencyBand.ALPHA: 0.5, FrequencyBand.BETA: 0.7, FrequencyBand.GAMMA: 0.9 } current_kappa = kappa_baseline.copy() current_phi = phi_baseline.copy() # Initialize invariant field self.renewal_engine.initialize_invariant_field(current_kappa) for i in range(time_points): t = i * dt # Apply scenario if scenario == "dual_stress": if 2.0 <= t <= 4.0: # Hypo-coherence phase current_kappa[FrequencyBand.DELTA] = 0.15 current_kappa[FrequencyBand.THETA] = 0.20 current_kappa[FrequencyBand.BETA] = 0.18 elif 5.0 <= t <= 7.0: # Hyper-coherence phase current_kappa[FrequencyBand.ALPHA] = 0.92 current_kappa[FrequencyBand.GAMMA] = 0.88 current_kappa[FrequencyBand.BETA] = 0.85 elif scenario == "oscillatory": # Oscillating coherence for band in FrequencyBand: freq = ABCRConfig.BAND_FREQUENCIES[band] current_kappa[band] = 0.5 + 0.4 * np.sin(2 * np.pi * freq * t / 20) elif scenario == "cascade": # Cascading failure if t > 3.0: failed_bands = int((t - 3.0) / 1.5) for idx, band in enumerate(FrequencyBand): if idx < failed_bands: current_kappa[band] = 0.1 # Add noise for band in FrequencyBand: current_kappa[band] += np.random.normal(0, 0.02) current_kappa[band] = np.clip(current_kappa[band], 0, 1) # Process state recovered = self.process_coherence_state(current_kappa, current_phi, t) if recovered is not None: current_kappa = recovered coherence_history.append({ 'timestamp': t, 'kappa_state': current_kappa.copy(), 'recovered': recovered is not None }) return coherence_history def visualize_dynamics(self, coherence_history: List[Dict], save_path: str = None): """Visualize dual-stream coherence dynamics""" timestamps = [entry['timestamp'] for entry in coherence_history] kappa_values = {band: [] for band in FrequencyBand} for entry in coherence_history: for band in FrequencyBand: kappa_values[band].append(entry['kappa_state'][band]) fig, axes = plt.subplots(3, 1, figsize=(14, 12)) colors = { FrequencyBand.DELTA: 'blue', FrequencyBand.THETA: 'green', FrequencyBand.ALPHA: 'red', FrequencyBand.BETA: 'orange', FrequencyBand.GAMMA: 'purple' } # Plot 1: Coherence dynamics ax1 = axes[0] for band in FrequencyBand: ax1.plot(timestamps, kappa_values[band], label=band.value, color=colors[band], linewidth=2) # Add threshold regions ax1.axhspan(0, ABCRConfig.TAU_BASE, alpha=0.1, color='blue', label='Hypo-coherent zone') ax1.axhspan(1-ABCRConfig.TAU_BASE, 1, alpha=0.1, color='red', label='Hyper-coherent zone') ax1.axhline(y=0.5, color='gray', linestyle=':', alpha=0.5) ax1.set_ylabel('Coherence κ') ax1.set_title('Adaptive Bi-Coupled Coherence Recovery - Dual Stream Processing') ax1.legend(loc='upper right') ax1.grid(True, alpha=0.3) ax1.set_ylim(-0.05, 1.05) # Plot 2: Stream activity ax2 = axes[1] stream_A_activity = [] stream_B_activity = [] for event in self.system_history: if 'audit' in event: streams = event['audit']['active_streams'] t = event['timestamp'] if 'Stream A: Hypo-coherence' in streams: stream_A_activity.append(t) if 'Stream B: Hyper-coherence' in streams: stream_B_activity.append(t) if stream_A_activity: ax2.scatter(stream_A_activity, [0.3]*len(stream_A_activity), color='blue', s=50, alpha=0.7, label='Stream A (Hypo)') if stream_B_activity: ax2.scatter(stream_B_activity, [0.7]*len(stream_B_activity), color='red', s=50, alpha=0.7, label='Stream B (Hyper)') ax2.set_ylabel('Stream Activity') ax2.set_title('Dual-Stream Processing Activity') ax2.legend() ax2.grid(True, alpha=0.3) ax2.set_ylim(0, 1) # Plot 3: Audit scores ax3 = axes[2] audit_times = [] s_composite_vals = [] seam_types = [] for event in self.system_history: if 'audit' in event: audit_times.append(event['timestamp']) s_composite_vals.append(event['audit']['s_composite']) seam_types.append(event['audit']['seam_type']) if audit_times: # Color by seam type seam_colors = [] for seam in seam_types: if 'Type I' in seam: seam_colors.append('green') elif 'Type II' in seam: seam_colors.append('orange') else: seam_colors.append('red') ax3.scatter(audit_times, s_composite_vals, c=seam_colors, s=30, alpha=0.7) ax3.axhline(y=0, color='gray', linestyle='-', alpha=0.5) ax3.axhline(y=ABCRConfig.AUDIT_TOLERANCE, color='green', linestyle='--', alpha=0.5, label='Audit threshold') ax3.axhline(y=-ABCRConfig.AUDIT_TOLERANCE, color='green', linestyle='--', alpha=0.5) ax3.set_xlabel('Time (s)') ax3.set_ylabel('Composite Residual') ax3.set_title('Dual-Stream Audit Results') ax3.legend() ax3.grid(True, alpha=0.3) plt.tight_layout() if save_path: plt.savefig(save_path, dpi=300, bbox_inches='tight') logger.info(f"Visualization saved to {save_path}") plt.show() def save_state(self, filepath: str): """Save system state to file""" state = { 'mode': self.mode.value, 'invariant_field': self.renewal_engine.Pi, 'system_history': self.system_history, 'renewal_history': self.renewal_engine.renewal_history, 'kappa_history': self.kappa_history[-10:] if self.kappa_history else [] } with open(filepath, 'w') as f: json.dump(state, f, indent=2, default=str) logger.info(f"System state saved to {filepath}") def load_state(self, filepath: str): """Load system state from file""" with open(filepath, 'r') as f: state = json.load(f) self.mode = SystemMode(state['mode']) self.renewal_engine.Pi = state['invariant_field'] self.system_history = state['system_history'] self.renewal_engine.renewal_history = state['renewal_history'] self.kappa_history = state['kappa_history'] logger.info(f"System state loaded from {filepath}") # ================================ DEMONSTRATION ================================ def demonstrate_abcr_system(): """Demonstrate the complete ABCR system with various scenarios""" print("=" * 70) print("ADAPTIVE BI-COUPLED COHERENCE RECOVERY (ABCR) SYSTEM") print("=" * 70) print() # Test different modes and scenarios scenarios = [ ("dual_stress", SystemMode.ADAPTIVE), ("oscillatory", SystemMode.HIGH_SENSITIVITY), ("cascade", SystemMode.RECOVERY) ] for scenario_name, mode in scenarios: print(f"\nTesting scenario: {scenario_name} with {mode.value} mode") print("-" * 50) # Initialize system system = AdaptiveBiCoupledCoherenceSystem(mode=mode) # Run simulation history = system.simulate_dynamics(duration=10.0, dt=0.1, scenario=scenario_name) # Analyze results successful_recoveries = sum(1 for event in system.system_history if event['event'] == 'successful_recovery') total_events = len(system.system_history) print(f"Results for {scenario_name}:") print(f" Total events: {len(history)}") print(f" Recovery attempts: {total_events}") print(f" Successful recoveries: {successful_recoveries}") if total_events > 0: print(f" Success rate: {successful_recoveries/total_events*100:.1f}%") # Count stream activity stream_A_count = sum(1 for event in system.system_history if 'audit' in event and 'Stream A: Hypo-coherence' in event['audit']['active_streams']) stream_B_count = sum(1 for event in system.system_history if 'audit' in event and 'Stream B: Hyper-coherence' in event['audit']['active_streams']) print(f" Stream A (hypo) activations: {stream_A_count}") print(f" Stream B (hyper) activations: {stream_B_count}") # Visualize system.visualize_dynamics(history, f"abcr_{scenario_name}_{mode.value}.png") # Save state system.save_state(f"abcr_state_{scenario_name}_{mode.value}.json") print() print("=" * 70) print("ABCR system demonstration complete!") print("=" * 70) if __name__ == "__main__": demonstrate_abcr_system()