9x25dillon's picture
Upload 10 files
3ea65b2 verified
import numpy as np
from dataclasses import dataclass, field
from typing import Dict, List, Tuple, Optional, Any, Literal
from enum import Enum
import json
from datetime import datetime
import logging
import matplotlib.pyplot as plt
from scipy.special import expit as sigmoid
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ================================ ENUMS ================================
class FrequencyBand(Enum):
DELTA = 'delta'
THETA = 'theta'
ALPHA = 'alpha'
BETA = 'beta'
GAMMA = 'gamma'
class StreamType(Enum):
STREAM_A = "Stream A: Hypo-coherence"
STREAM_B = "Stream B: Hyper-coherence"
DUAL = "Dual Stream"
class SeamType(Enum):
TYPE_I = "Type I: Perfect Recovery"
TYPE_II = "Type II: Acceptable Loss"
TYPE_III = "Type III: Failed Recovery"
class SystemMode(Enum):
STANDARD = "standard"
HIGH_SENSITIVITY = "high_sensitivity"
STABILITY = "stability"
RECOVERY = "recovery"
ADAPTIVE = "adaptive"
class ChainState(Enum):
HYPO = "hypo-coherent"
HYPER = "hyper-coherent"
INTACT = "intact"
# ================================ DATACLASSES ================================
@dataclass
class SpatialPosition:
x: float
y: float
m: int
n: int
def distance_to(self, other: 'SpatialPosition') -> float:
return np.sqrt((self.x - other.x)**2 + (self.y - other.y)**2)
def radius(self) -> float:
return np.sqrt(self.x**2 + self.y**2)
def angle(self) -> float:
return np.arctan2(self.y, self.x)
@dataclass
class ChainComponent:
band: FrequencyBand
positions: List[SpatialPosition]
coherence: float
phase_std: float
state: ChainState
stream: StreamType
@dataclass
class DualAuditResult:
# Stream A metrics
delta_kappa_A: float
s_A: float # Stream A residual
# Stream B metrics
delta_kappa_B: float
s_B: float # Stream B residual
# Composite metrics
s_composite: float
tau_R: float
D_C: float
D_omega: float
R: float
I: float
# Results
seam_type: SeamType
audit_pass: bool
active_streams: List[StreamType]
details: Dict[str, Any] = field(default_factory=dict)
@dataclass
class AdaptiveThresholds:
tau_low: float
tau_high: float
tau_phase: float
alpha: float
stress: float
mode: SystemMode
# ================================ CONFIGURATION ================================
class ABCRConfig:
"""Adaptive Bi-Coupled Coherence Recovery Configuration"""
# Spatial grid
SPATIAL_GRID_M = 8
SPATIAL_GRID_N = 8
SPATIAL_UNIT = 0.1
PROPAGATION_SPEED = 1.0
# Base thresholds
TAU_BASE = 0.3
TAU_PHASE = 0.5
# Mode-specific parameters
MODE_PARAMS = {
SystemMode.STANDARD: {
'alpha_base': 0.6,
'alpha_mod': 0.1,
'rho': 0.7,
'novelty': 0.1,
'baseline': 0.6
},
SystemMode.HIGH_SENSITIVITY: {
'alpha_base': 0.65,
'alpha_mod': 0.15,
'rho': 0.6,
'novelty': 0.12,
'baseline': 0.65,
'tau_low_factor': 0.8,
'tau_high_factor': 1.2
},
SystemMode.STABILITY: {
'alpha_base': 0.5,
'alpha_mod': 0.05,
'rho': 0.8,
'novelty': 0.05,
'baseline': 0.5
},
SystemMode.RECOVERY: {
'alpha_base': 0.65,
'alpha_mod': 0.15,
'rho': 0.6,
'novelty': 0.15,
'baseline': 0.7
}
}
# Coupling parameters
LAMBDA_CROSS_STREAM = 0.3 # Cross-stream coupling coefficient
# Audit tolerances
AUDIT_TOLERANCE = 0.01
TYPE_I_THRESHOLD = 1e-6
# Emergency thresholds
EMERGENCY_HYPO_THRESHOLD = 0.1
EMERGENCY_HYPER_THRESHOLD = 0.9
# Convergence parameters
MAX_RECONSTRUCTION_ITERATIONS = 100
CONVERGENCE_TOLERANCE = 0.001
# Frequency band center frequencies (Hz)
BAND_FREQUENCIES = {
FrequencyBand.DELTA: 2.0,
FrequencyBand.THETA: 6.0,
FrequencyBand.ALPHA: 10.0,
FrequencyBand.BETA: 20.0,
FrequencyBand.GAMMA: 40.0
}
# ================================ CORE COMPONENTS ================================
class DualStreamEncoder:
"""Encodes states into forward and mirror capsules for bi-directional processing"""
def __init__(self, M: int = ABCRConfig.SPATIAL_GRID_M,
N: int = ABCRConfig.SPATIAL_GRID_N):
self.M = M
self.N = N
self.positions = self._initialize_positions()
self.spatial_cache = {}
def _initialize_positions(self) -> List[SpatialPosition]:
positions = []
for m in range(-self.M, self.M + 1):
for n in range(-self.N, self.N + 1):
x = m * ABCRConfig.SPATIAL_UNIT
y = n * ABCRConfig.SPATIAL_UNIT
positions.append(SpatialPosition(x, y, m, n))
return positions
def encode_forward_capsule(self, kappa_bands: Dict[FrequencyBand, float],
phi_bands: Dict[FrequencyBand, float]) -> np.ndarray:
"""Standard forward encoding: C_F[m,n,b] = G_r * κ_b * exp(iφ_b - ik_b*r)"""
num_bands = len(FrequencyBand)
capsule = np.zeros((2*self.M+1, 2*self.N+1, num_bands), dtype=complex)
for pos in self.positions:
r = pos.radius()
G_r = np.exp(-r / (self.M * ABCRConfig.SPATIAL_UNIT))
for b_idx, band in enumerate(FrequencyBand):
omega_b = ABCRConfig.BAND_FREQUENCIES[band]
kappa_b = kappa_bands[band]
phi_b = phi_bands[band]
k_b = 2 * np.pi * omega_b / ABCRConfig.PROPAGATION_SPEED
phase_shift = k_b * r
total_phase = phi_b - phase_shift
capsule[pos.m + self.M, pos.n + self.N, b_idx] = \
G_r * kappa_b * np.exp(1j * total_phase)
return capsule
def encode_mirror_capsule(self, kappa_bands: Dict[FrequencyBand, float],
phi_bands: Dict[FrequencyBand, float]) -> np.ndarray:
"""Mirror encoding: C_M[m,n,b] = G_r * (1-κ_b) * exp(i(π-φ_b) + ik_b*r)"""
num_bands = len(FrequencyBand)
capsule = np.zeros((2*self.M+1, 2*self.N+1, num_bands), dtype=complex)
for pos in self.positions:
r = pos.radius()
G_r = np.exp(-r / (self.M * ABCRConfig.SPATIAL_UNIT))
for b_idx, band in enumerate(FrequencyBand):
omega_b = ABCRConfig.BAND_FREQUENCIES[band]
kappa_b = kappa_bands[band]
phi_b = phi_bands[band]
k_b = 2 * np.pi * omega_b / ABCRConfig.PROPAGATION_SPEED
phase_shift = k_b * r
total_phase = (np.pi - phi_b) + phase_shift
capsule[pos.m + self.M, pos.n + self.N, b_idx] = \
G_r * (1 - kappa_b) * np.exp(1j * total_phase)
return capsule
def compute_spatial_coupling(self, pos1: SpatialPosition, pos2: SpatialPosition,
band1: FrequencyBand, band2: FrequencyBand) -> float:
"""Compute spatial coupling strength between positions and bands"""
cache_key = (pos1.m, pos1.n, pos2.m, pos2.n, band1.value, band2.value)
if cache_key in self.spatial_cache:
return self.spatial_cache[cache_key]
distance = pos1.distance_to(pos2)
spatial_factor = np.exp(-distance / ABCRConfig.SPATIAL_UNIT)
freq_diff = abs(ABCRConfig.BAND_FREQUENCIES[band1] -
ABCRConfig.BAND_FREQUENCIES[band2])
freq_factor = np.exp(-freq_diff / 10.0)
coupling = spatial_factor * freq_factor
self.spatial_cache[cache_key] = coupling
return coupling
class AdaptiveThresholdManager:
"""Manages adaptive thresholds based on system stress and mode"""
def compute_system_stress(self, kappa: Dict[FrequencyBand, float],
kappa_history: List[Dict[FrequencyBand, float]] = None) -> float:
"""Compute system stress indicator σ(t)"""
if kappa_history and len(kappa_history) > 0:
# Compute rate of change
prev_kappa = kappa_history[-1]
dk_dt = {b: abs(kappa[b] - prev_kappa[b]) for b in FrequencyBand}
numerator = sum(dk_dt.values())
denominator = sum(kappa.values()) + 1e-10
stress = min(1.0, numerator / denominator)
else:
# Use deviation from balanced state
balanced = 0.5
deviations = [abs(k - balanced) for k in kappa.values()]
stress = np.mean(deviations) * 2
return np.clip(stress, 0, 1)
def compute_adaptive_thresholds(self, stress: float, mode: SystemMode) -> AdaptiveThresholds:
"""Compute adaptive thresholds based on stress and mode"""
params = ABCRConfig.MODE_PARAMS[mode]
# Base thresholds
tau_base = ABCRConfig.TAU_BASE
# Adaptive adjustments
tau_low = tau_base * (1 - stress * 0.3)
tau_high = 1 - tau_base * (1 - stress * 0.3)
# Mode-specific adjustments
if mode == SystemMode.HIGH_SENSITIVITY:
tau_low *= params.get('tau_low_factor', 0.8)
tau_high *= params.get('tau_high_factor', 1.2)
elif mode == SystemMode.STABILITY:
tau_low *= 1.1
tau_high *= 0.9
# Adaptive alpha
alpha = params['alpha_base'] + params['alpha_mod'] * stress
# Phase coherence threshold (less adaptive)
tau_phase = ABCRConfig.TAU_PHASE * (1 + stress * 0.1)
return AdaptiveThresholds(
tau_low=np.clip(tau_low, 0.1, 0.5),
tau_high=np.clip(tau_high, 0.5, 0.9),
tau_phase=tau_phase,
alpha=np.clip(alpha, 0.3, 0.8),
stress=stress,
mode=mode
)
class DualStreamProcessor:
"""Processes both hypo and hyper coherence streams"""
def __init__(self, encoder: DualStreamEncoder):
self.encoder = encoder
self.embedding_cache = {}
def detect_broken_chains(self, kappa: Dict[FrequencyBand, float],
C_F: np.ndarray, C_M: np.ndarray,
thresholds: AdaptiveThresholds) -> Tuple[List[ChainComponent],
List[ChainComponent],
Dict[FrequencyBand, float]]:
"""Detect broken chains in both streams"""
broken_A = [] # Hypo-coherent chains
broken_B = [] # Hyper-coherent chains
intact = {}
for b_idx, band in enumerate(FrequencyBand):
kappa_b = kappa[band]
if kappa_b < thresholds.tau_low:
# Hypo-coherent: use forward capsule
phase_coh = self._compute_phase_coherence(C_F, band, b_idx)
if phase_coh < thresholds.tau_phase:
component = ChainComponent(
band=band,
positions=self._get_significant_positions(C_F, b_idx),
coherence=kappa_b,
phase_std=np.sqrt(1 - phase_coh),
state=ChainState.HYPO,
stream=StreamType.STREAM_A
)
broken_A.append(component)
logger.info(f"Stream A - Hypo-coherent chain: {band.value} "
f"(κ={kappa_b:.3f}, phase_coh={phase_coh:.3f})")
else:
intact[band] = kappa_b
elif kappa_b > thresholds.tau_high:
# Hyper-coherent: use mirror capsule
phase_coh = self._compute_phase_coherence(C_M, band, b_idx)
if phase_coh < thresholds.tau_phase:
component = ChainComponent(
band=band,
positions=self._get_significant_positions(C_M, b_idx),
coherence=kappa_b,
phase_std=np.sqrt(1 - phase_coh),
state=ChainState.HYPER,
stream=StreamType.STREAM_B
)
broken_B.append(component)
logger.info(f"Stream B - Hyper-coherent chain: {band.value} "
f"(κ={kappa_b:.3f}, phase_coh={phase_coh:.3f})")
else:
intact[band] = kappa_b
else:
intact[band] = kappa_b
return broken_A, broken_B, intact
def _compute_phase_coherence(self, capsule: np.ndarray, band: FrequencyBand,
b_idx: int) -> float:
"""Compute phase coherence for a band"""
phases = []
for pos in self.encoder.positions:
value = capsule[pos.m + self.encoder.M, pos.n + self.encoder.N, b_idx]
if np.abs(value) > 1e-6:
phases.append(np.angle(value))
if len(phases) < 2:
return 0.0
# Compute circular variance
mean_vector = np.mean(np.exp(1j * np.array(phases)))
coherence = np.abs(mean_vector)
return coherence
def _get_significant_positions(self, capsule: np.ndarray, b_idx: int,
threshold: float = 1e-3) -> List[SpatialPosition]:
"""Get positions with significant amplitude"""
positions = []
for pos in self.encoder.positions:
amplitude = np.abs(capsule[pos.m + self.encoder.M, pos.n + self.encoder.N, b_idx])
if amplitude > threshold:
positions.append(pos)
return positions
class BiCoupledReconstructor:
"""Reconstructs coherence using bi-coupled Hamiltonians"""
def __init__(self, encoder: DualStreamEncoder):
self.encoder = encoder
self.H_A = {} # Stream A Hamiltonians
self.H_B = {} # Stream B Hamiltonians
def compute_hamiltonians(self, broken_A: List[ChainComponent],
broken_B: List[ChainComponent],
intact: Dict[FrequencyBand, float],
C_F: np.ndarray, C_M: np.ndarray):
"""Compute bi-coupled Hamiltonians for both streams"""
# Stream A Hamiltonians (for hypo-coherent chains)
for component in broken_A:
band = component.band
b_idx = list(FrequencyBand).index(band)
# Forward field bias
h_F = 0
for pos in component.positions:
h_F += C_F[pos.m + self.encoder.M, pos.n + self.encoder.N, b_idx]
# Mirror field contribution
h_M = 0
for pos in component.positions:
h_M += C_M[pos.m + self.encoder.M, pos.n + self.encoder.N, b_idx]
# Coupling to intact bands
J_coupling = 0
for intact_band, intact_val in intact.items():
for pos1 in component.positions:
for pos2 in self.encoder.positions:
coupling = self.encoder.compute_spatial_coupling(
pos1, pos2, band, intact_band
)
J_coupling += coupling * intact_val
# Bi-coupled Hamiltonian for Stream A
self.H_A[band] = h_F + J_coupling + ABCRConfig.LAMBDA_CROSS_STREAM * h_M
# Stream B Hamiltonians (for hyper-coherent chains)
for component in broken_B:
band = component.band
b_idx = list(FrequencyBand).index(band)
# Mirror field bias
h_M = 0
for pos in component.positions:
h_M += C_M[pos.m + self.encoder.M, pos.n + self.encoder.N, b_idx]
# Forward field contribution
h_F = 0
for pos in component.positions:
h_F += C_F[pos.m + self.encoder.M, pos.n + self.encoder.N, b_idx]
# Coupling to intact bands (inverted)
J_coupling = 0
for intact_band, intact_val in intact.items():
for pos1 in component.positions:
for pos2 in self.encoder.positions:
coupling = self.encoder.compute_spatial_coupling(
pos1, pos2, band, intact_band
)
J_coupling += coupling * (1 - intact_val)
# Bi-coupled Hamiltonian for Stream B
self.H_B[band] = h_M + J_coupling + ABCRConfig.LAMBDA_CROSS_STREAM * h_F
def reconstruct(self, broken_A: List[ChainComponent],
broken_B: List[ChainComponent],
intact: Dict[FrequencyBand, float]) -> Dict[FrequencyBand, float]:
"""Iteratively reconstruct broken chains using bi-coupled fields"""
kappa_recon = intact.copy()
# Initialize broken bands
for component in broken_A:
kappa_recon[component.band] = 0.3 # Start from low coherence
for component in broken_B:
kappa_recon[component.band] = 0.7 # Start from high coherence
# Iterative reconstruction
for iteration in range(ABCRConfig.MAX_RECONSTRUCTION_ITERATIONS):
converged = True
# Reconstruct hypo-coherent chains
for component in broken_A:
band = component.band
field = np.abs(self.H_A.get(band, 0))
# Sigmoid activation for Stream A
kappa_new = sigmoid(field)
if abs(kappa_new - kappa_recon[band]) > ABCRConfig.CONVERGENCE_TOLERANCE:
converged = False
kappa_recon[band] = kappa_new
# Reconstruct hyper-coherent chains
for component in broken_B:
band = component.band
field = np.abs(self.H_B.get(band, 0))
# Inverted sigmoid for Stream B
kappa_new = 1 - sigmoid(field)
if abs(kappa_new - kappa_recon[band]) > ABCRConfig.CONVERGENCE_TOLERANCE:
converged = False
kappa_recon[band] = kappa_new
if converged:
logger.info(f"Reconstruction converged in {iteration+1} iterations")
break
return kappa_recon
class DualStreamAuditor:
"""Performs integrity audit across both processing streams"""
def audit(self, kappa_original: Dict[FrequencyBand, float],
kappa_recon: Dict[FrequencyBand, float],
broken_A: List[ChainComponent],
broken_B: List[ChainComponent],
timestamp_original: float,
timestamp_recon: float) -> DualAuditResult:
"""Perform dual-stream integrity audit"""
# Compute per-stream metrics
delta_kappa_A = self._compute_stream_delta(kappa_original, kappa_recon, broken_A)
delta_kappa_B = self._compute_stream_delta(kappa_original, kappa_recon, broken_B)
# Common metrics
tau_R = abs(timestamp_recon - timestamp_original)
D_C = self._compute_curvature_change(kappa_original, kappa_recon)
D_omega = self._compute_entropy_drift(kappa_original, kappa_recon)
R = self._compute_return_credit(kappa_original, kappa_recon)
# Stream residuals
s_A = R * tau_R - (delta_kappa_A + D_omega + D_C) if broken_A else 0
s_B = R * tau_R - (delta_kappa_B + D_omega + D_C) if broken_B else 0
# Composite score with weighting
active_streams = []
if broken_A:
active_streams.append(StreamType.STREAM_A)
if broken_B:
active_streams.append(StreamType.STREAM_B)
if broken_A and broken_B:
w_A = len(broken_A) / (len(broken_A) + len(broken_B))
w_B = len(broken_B) / (len(broken_A) + len(broken_B))
s_composite = w_A * s_A + w_B * s_B
elif broken_A:
s_composite = s_A
elif broken_B:
s_composite = s_B
else:
s_composite = 0
# Determine seam type
delta_kappa_avg = np.mean([kappa_recon[b] - kappa_original[b]
for b in FrequencyBand])
if abs(s_composite) < ABCRConfig.AUDIT_TOLERANCE:
if abs(delta_kappa_avg) < ABCRConfig.TYPE_I_THRESHOLD:
seam_type = SeamType.TYPE_I
else:
seam_type = SeamType.TYPE_II
audit_pass = True
else:
seam_type = SeamType.TYPE_III
audit_pass = False
# Compute integrity index
kappa_final = np.mean(list(kappa_recon.values()))
I = np.exp(kappa_final)
result = DualAuditResult(
delta_kappa_A=delta_kappa_A,
s_A=s_A,
delta_kappa_B=delta_kappa_B,
s_B=s_B,
s_composite=s_composite,
tau_R=tau_R,
D_C=D_C,
D_omega=D_omega,
R=R,
I=I,
seam_type=seam_type,
audit_pass=audit_pass,
active_streams=active_streams,
details={
'broken_A_count': len(broken_A),
'broken_B_count': len(broken_B),
'delta_kappa_avg': delta_kappa_avg
}
)
logger.info(f"Dual-stream audit: {seam_type.value}, "
f"s_composite={s_composite:.6f}, pass={audit_pass}")
return result
def _compute_stream_delta(self, original: Dict[FrequencyBand, float],
recon: Dict[FrequencyBand, float],
broken: List[ChainComponent]) -> float:
"""Compute average delta for a specific stream"""
if not broken:
return 0
deltas = [recon[c.band] - original[c.band] for c in broken]
return np.mean(deltas) if deltas else 0
def _compute_curvature_change(self, original: Dict[FrequencyBand, float],
recon: Dict[FrequencyBand, float]) -> float:
"""Compute change in spectral curvature"""
original_vals = np.array(list(original.values()))
recon_vals = np.array(list(recon.values()))
if len(original_vals) >= 3:
original_curvature = np.mean(np.abs(np.diff(original_vals, n=2)))
recon_curvature = np.mean(np.abs(np.diff(recon_vals, n=2)))
return abs(recon_curvature - original_curvature)
return 0.0
def _compute_entropy_drift(self, original: Dict[FrequencyBand, float],
recon: Dict[FrequencyBand, float]) -> float:
"""Compute entropy-based drift metric"""
errors = [recon[b] - original[b] for b in FrequencyBand]
return np.std(errors)
def _compute_return_credit(self, original: Dict[FrequencyBand, float],
recon: Dict[FrequencyBand, float]) -> float:
"""Compute return credit based on recovery quality"""
ratios = []
for band in FrequencyBand:
if original[band] > 0:
ratio = recon[band] / original[band]
ratio = np.clip(ratio, 0, 2)
ratios.append(1 - abs(1 - ratio))
return np.mean(ratios) if ratios else 0.0
class AdaptiveRenewalEngine:
"""Manages cognitive renewal with mode-dependent strategies"""
def __init__(self):
self.Pi = None # Invariant field
self.renewal_history = []
self.release_history = []
def initialize_invariant_field(self, kappa_initial: Dict[FrequencyBand, float]):
"""Initialize the invariant field Pi"""
self.Pi = kappa_initial.copy()
logger.info(f"Invariant field initialized: mean κ = {np.mean(list(self.Pi.values())):.3f}")
def update_invariant_field(self, kappa_current: Dict[FrequencyBand, float],
beta: float = 0.1):
"""Update invariant field with exponential moving average"""
if self.Pi is None:
self.initialize_invariant_field(kappa_current)
return
for band in FrequencyBand:
self.Pi[band] = (1 - beta) * self.Pi[band] + beta * kappa_current[band]
def perform_renewal(self, kappa_fragmented: Dict[FrequencyBand, float],
mode: SystemMode) -> Dict[FrequencyBand, float]:
"""Perform adaptive renewal based on mode"""
if self.Pi is None:
logger.warning("Cannot renew: Pi not initialized")
return kappa_fragmented
params = ABCRConfig.MODE_PARAMS[mode]
rho = params['rho']
novelty = params['novelty']
baseline = params['baseline']
kappa_renewed = {}
for band in FrequencyBand:
xi = np.random.normal(0, novelty)
kappa_renewed[band] = (
rho * self.Pi[band] +
(1 - rho) * baseline +
xi
)
kappa_renewed[band] = np.clip(kappa_renewed[band], 0, 1)
self.renewal_history.append({
'timestamp': datetime.now().isoformat(),
'mode': mode.value,
'kappa_before': kappa_fragmented.copy(),
'kappa_after': kappa_renewed.copy(),
'Pi_state': self.Pi.copy()
})
logger.info(f"Renewal ({mode.value}): mean κ before={np.mean(list(kappa_fragmented.values())):.3f}, "
f"after={np.mean(list(kappa_renewed.values())):.3f}")
return kappa_renewed
# ================================ MAIN SYSTEM ================================
class AdaptiveBiCoupledCoherenceSystem:
"""Main ABCR system orchestrating all components"""
def __init__(self, mode: SystemMode = SystemMode.STANDARD):
self.mode = mode
self.encoder = DualStreamEncoder()
self.threshold_manager = AdaptiveThresholdManager()
self.processor = DualStreamProcessor(self.encoder)
self.reconstructor = BiCoupledReconstructor(self.encoder)
self.auditor = DualStreamAuditor()
self.renewal_engine = AdaptiveRenewalEngine()
self.current_capsules = {'forward': None, 'mirror': None}
self.kappa_history = []
self.system_history = []
logger.info(f"ABCR System initialized in {mode.value} mode")
def process_coherence_state(self, kappa: Dict[FrequencyBand, float],
phi: Dict[FrequencyBand, float],
timestamp: float) -> Optional[Dict[FrequencyBand, float]]:
"""Main processing pipeline"""
# Step 1: Compute adaptive thresholds
stress = self.threshold_manager.compute_system_stress(kappa, self.kappa_history)
thresholds = self.threshold_manager.compute_adaptive_thresholds(stress, self.mode)
logger.info(f"System stress: {stress:.3f}, tau_low={thresholds.tau_low:.3f}, "
f"tau_high={thresholds.tau_high:.3f}, alpha={thresholds.alpha:.3f}")
# Step 2: Encode dual capsules
C_F = self.encoder.encode_forward_capsule(kappa, phi)
C_M = self.encoder.encode_mirror_capsule(kappa, phi)
self.current_capsules = {'forward': C_F, 'mirror': C_M}
# Step 3: Detect broken chains
broken_A, broken_B, intact = self.processor.detect_broken_chains(
kappa, C_F, C_M, thresholds
)
# Step 4: Check emergency conditions
if self._check_emergency_conditions(kappa, broken_A, broken_B):
logger.critical("EMERGENCY DECOUPLE TRIGGERED")
return None
# Step 5: Handle based on detection results
if not broken_A and not broken_B:
logger.info("No broken chains detected - state stable")
self.kappa_history.append(kappa.copy())
return kappa
logger.info("=" * 70)
logger.info(f"DUAL-STREAM RECOVERY: {len(broken_A)} hypo, {len(broken_B)} hyper chains")
logger.info("=" * 70)
# Step 6: Compute bi-coupled Hamiltonians
self.reconstructor.compute_hamiltonians(broken_A, broken_B, intact, C_F, C_M)
# Step 7: Reconstruct
kappa_recon = self.reconstructor.reconstruct(broken_A, broken_B, intact)
# Step 8: Audit
audit_result = self.auditor.audit(
kappa, kappa_recon, broken_A, broken_B,
timestamp - 0.1 if self.kappa_history else timestamp,
timestamp
)
# Step 9: Apply renewal if audit passes
if audit_result.audit_pass:
kappa_final = self.renewal_engine.perform_renewal(kappa_recon, self.mode)
self.renewal_engine.update_invariant_field(kappa_final)
self._record_event('successful_recovery', timestamp, kappa, kappa_final, audit_result)
self.kappa_history.append(kappa_final.copy())
logger.info(f"✓ Recovery successful: {audit_result.seam_type.value}")
logger.info("=" * 70)
return kappa_final
else:
logger.error(f"✗ Audit failed: {audit_result.seam_type.value}")
self._record_event('failed_recovery', timestamp, kappa, kappa_recon, audit_result)
logger.info("=" * 70)
# Fallback recovery
return self._fallback_recovery(kappa)
def _check_emergency_conditions(self, kappa: Dict[FrequencyBand, float],
broken_A: List[ChainComponent],
broken_B: List[ChainComponent]) -> bool:
"""Check for emergency decouple conditions"""
min_kappa = min(kappa.values())
max_kappa = max(kappa.values())
if min_kappa < ABCRConfig.EMERGENCY_HYPO_THRESHOLD:
logger.critical(f"Emergency: Extreme hypo-coherence (min={min_kappa:.3f})")
return True
if max_kappa > ABCRConfig.EMERGENCY_HYPER_THRESHOLD:
logger.critical(f"Emergency: Extreme hyper-coherence (max={max_kappa:.3f})")
return True
if len(broken_A) + len(broken_B) >= len(FrequencyBand):
logger.critical("Emergency: All bands broken")
return True
return False
def _fallback_recovery(self, kappa: Dict[FrequencyBand, float]) -> Dict[FrequencyBand, float]:
"""Fallback recovery when audit fails"""
if self.renewal_engine.Pi is not None:
# Use invariant field as fallback
return self.renewal_engine.Pi.copy()
else:
# Default to balanced state
return {band: 0.5 for band in FrequencyBand}
def _record_event(self, event_type: str, timestamp: float,
kappa_before: Dict[FrequencyBand, float],
kappa_after: Dict[FrequencyBand, float],
audit_result: DualAuditResult):
"""Record system event for history"""
self.system_history.append({
'timestamp': timestamp,
'event': event_type,
'kappa_before': kappa_before.copy(),
'kappa_after': kappa_after.copy(),
'audit': {
'seam_type': audit_result.seam_type.value,
's_composite': audit_result.s_composite,
's_A': audit_result.s_A,
's_B': audit_result.s_B,
'active_streams': [s.value for s in audit_result.active_streams]
}
})
def set_mode(self, mode: SystemMode):
"""Change operational mode"""
self.mode = mode
logger.info(f"System mode changed to: {mode.value}")
def simulate_dynamics(self, duration: float = 10.0, dt: float = 0.1,
scenario: str = "dual_stress") -> List[Dict]:
"""Simulate coherence dynamics with various scenarios"""
time_points = int(duration / dt)
coherence_history = []
# Initialize
kappa_baseline = {
FrequencyBand.DELTA: 0.72,
FrequencyBand.THETA: 0.68,
FrequencyBand.ALPHA: 0.75,
FrequencyBand.BETA: 0.70,
FrequencyBand.GAMMA: 0.65
}
phi_baseline = {
FrequencyBand.DELTA: 0.1,
FrequencyBand.THETA: 0.3,
FrequencyBand.ALPHA: 0.5,
FrequencyBand.BETA: 0.7,
FrequencyBand.GAMMA: 0.9
}
current_kappa = kappa_baseline.copy()
current_phi = phi_baseline.copy()
# Initialize invariant field
self.renewal_engine.initialize_invariant_field(current_kappa)
for i in range(time_points):
t = i * dt
# Apply scenario
if scenario == "dual_stress":
if 2.0 <= t <= 4.0:
# Hypo-coherence phase
current_kappa[FrequencyBand.DELTA] = 0.15
current_kappa[FrequencyBand.THETA] = 0.20
current_kappa[FrequencyBand.BETA] = 0.18
elif 5.0 <= t <= 7.0:
# Hyper-coherence phase
current_kappa[FrequencyBand.ALPHA] = 0.92
current_kappa[FrequencyBand.GAMMA] = 0.88
current_kappa[FrequencyBand.BETA] = 0.85
elif scenario == "oscillatory":
# Oscillating coherence
for band in FrequencyBand:
freq = ABCRConfig.BAND_FREQUENCIES[band]
current_kappa[band] = 0.5 + 0.4 * np.sin(2 * np.pi * freq * t / 20)
elif scenario == "cascade":
# Cascading failure
if t > 3.0:
failed_bands = int((t - 3.0) / 1.5)
for idx, band in enumerate(FrequencyBand):
if idx < failed_bands:
current_kappa[band] = 0.1
# Add noise
for band in FrequencyBand:
current_kappa[band] += np.random.normal(0, 0.02)
current_kappa[band] = np.clip(current_kappa[band], 0, 1)
# Process state
recovered = self.process_coherence_state(current_kappa, current_phi, t)
if recovered is not None:
current_kappa = recovered
coherence_history.append({
'timestamp': t,
'kappa_state': current_kappa.copy(),
'recovered': recovered is not None
})
return coherence_history
def visualize_dynamics(self, coherence_history: List[Dict], save_path: str = None):
"""Visualize dual-stream coherence dynamics"""
timestamps = [entry['timestamp'] for entry in coherence_history]
kappa_values = {band: [] for band in FrequencyBand}
for entry in coherence_history:
for band in FrequencyBand:
kappa_values[band].append(entry['kappa_state'][band])
fig, axes = plt.subplots(3, 1, figsize=(14, 12))
colors = {
FrequencyBand.DELTA: 'blue',
FrequencyBand.THETA: 'green',
FrequencyBand.ALPHA: 'red',
FrequencyBand.BETA: 'orange',
FrequencyBand.GAMMA: 'purple'
}
# Plot 1: Coherence dynamics
ax1 = axes[0]
for band in FrequencyBand:
ax1.plot(timestamps, kappa_values[band], label=band.value,
color=colors[band], linewidth=2)
# Add threshold regions
ax1.axhspan(0, ABCRConfig.TAU_BASE, alpha=0.1, color='blue',
label='Hypo-coherent zone')
ax1.axhspan(1-ABCRConfig.TAU_BASE, 1, alpha=0.1, color='red',
label='Hyper-coherent zone')
ax1.axhline(y=0.5, color='gray', linestyle=':', alpha=0.5)
ax1.set_ylabel('Coherence κ')
ax1.set_title('Adaptive Bi-Coupled Coherence Recovery - Dual Stream Processing')
ax1.legend(loc='upper right')
ax1.grid(True, alpha=0.3)
ax1.set_ylim(-0.05, 1.05)
# Plot 2: Stream activity
ax2 = axes[1]
stream_A_activity = []
stream_B_activity = []
for event in self.system_history:
if 'audit' in event:
streams = event['audit']['active_streams']
t = event['timestamp']
if 'Stream A: Hypo-coherence' in streams:
stream_A_activity.append(t)
if 'Stream B: Hyper-coherence' in streams:
stream_B_activity.append(t)
if stream_A_activity:
ax2.scatter(stream_A_activity, [0.3]*len(stream_A_activity),
color='blue', s=50, alpha=0.7, label='Stream A (Hypo)')
if stream_B_activity:
ax2.scatter(stream_B_activity, [0.7]*len(stream_B_activity),
color='red', s=50, alpha=0.7, label='Stream B (Hyper)')
ax2.set_ylabel('Stream Activity')
ax2.set_title('Dual-Stream Processing Activity')
ax2.legend()
ax2.grid(True, alpha=0.3)
ax2.set_ylim(0, 1)
# Plot 3: Audit scores
ax3 = axes[2]
audit_times = []
s_composite_vals = []
seam_types = []
for event in self.system_history:
if 'audit' in event:
audit_times.append(event['timestamp'])
s_composite_vals.append(event['audit']['s_composite'])
seam_types.append(event['audit']['seam_type'])
if audit_times:
# Color by seam type
seam_colors = []
for seam in seam_types:
if 'Type I' in seam:
seam_colors.append('green')
elif 'Type II' in seam:
seam_colors.append('orange')
else:
seam_colors.append('red')
ax3.scatter(audit_times, s_composite_vals, c=seam_colors, s=30, alpha=0.7)
ax3.axhline(y=0, color='gray', linestyle='-', alpha=0.5)
ax3.axhline(y=ABCRConfig.AUDIT_TOLERANCE, color='green',
linestyle='--', alpha=0.5, label='Audit threshold')
ax3.axhline(y=-ABCRConfig.AUDIT_TOLERANCE, color='green',
linestyle='--', alpha=0.5)
ax3.set_xlabel('Time (s)')
ax3.set_ylabel('Composite Residual')
ax3.set_title('Dual-Stream Audit Results')
ax3.legend()
ax3.grid(True, alpha=0.3)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
logger.info(f"Visualization saved to {save_path}")
plt.show()
def save_state(self, filepath: str):
"""Save system state to file"""
state = {
'mode': self.mode.value,
'invariant_field': self.renewal_engine.Pi,
'system_history': self.system_history,
'renewal_history': self.renewal_engine.renewal_history,
'kappa_history': self.kappa_history[-10:] if self.kappa_history else []
}
with open(filepath, 'w') as f:
json.dump(state, f, indent=2, default=str)
logger.info(f"System state saved to {filepath}")
def load_state(self, filepath: str):
"""Load system state from file"""
with open(filepath, 'r') as f:
state = json.load(f)
self.mode = SystemMode(state['mode'])
self.renewal_engine.Pi = state['invariant_field']
self.system_history = state['system_history']
self.renewal_engine.renewal_history = state['renewal_history']
self.kappa_history = state['kappa_history']
logger.info(f"System state loaded from {filepath}")
# ================================ DEMONSTRATION ================================
def demonstrate_abcr_system():
"""Demonstrate the complete ABCR system with various scenarios"""
print("=" * 70)
print("ADAPTIVE BI-COUPLED COHERENCE RECOVERY (ABCR) SYSTEM")
print("=" * 70)
print()
# Test different modes and scenarios
scenarios = [
("dual_stress", SystemMode.ADAPTIVE),
("oscillatory", SystemMode.HIGH_SENSITIVITY),
("cascade", SystemMode.RECOVERY)
]
for scenario_name, mode in scenarios:
print(f"\nTesting scenario: {scenario_name} with {mode.value} mode")
print("-" * 50)
# Initialize system
system = AdaptiveBiCoupledCoherenceSystem(mode=mode)
# Run simulation
history = system.simulate_dynamics(duration=10.0, dt=0.1, scenario=scenario_name)
# Analyze results
successful_recoveries = sum(1 for event in system.system_history
if event['event'] == 'successful_recovery')
total_events = len(system.system_history)
print(f"Results for {scenario_name}:")
print(f" Total events: {len(history)}")
print(f" Recovery attempts: {total_events}")
print(f" Successful recoveries: {successful_recoveries}")
if total_events > 0:
print(f" Success rate: {successful_recoveries/total_events*100:.1f}%")
# Count stream activity
stream_A_count = sum(1 for event in system.system_history
if 'audit' in event and
'Stream A: Hypo-coherence' in event['audit']['active_streams'])
stream_B_count = sum(1 for event in system.system_history
if 'audit' in event and
'Stream B: Hyper-coherence' in event['audit']['active_streams'])
print(f" Stream A (hypo) activations: {stream_A_count}")
print(f" Stream B (hyper) activations: {stream_B_count}")
# Visualize
system.visualize_dynamics(history, f"abcr_{scenario_name}_{mode.value}.png")
# Save state
system.save_state(f"abcr_state_{scenario_name}_{mode.value}.json")
print()
print("=" * 70)
print("ABCR system demonstration complete!")
print("=" * 70)
if __name__ == "__main__":
demonstrate_abcr_system()