|
|
import numpy as np |
|
|
from dataclasses import dataclass, field |
|
|
from typing import Dict, List, Tuple, Optional, Any, Literal |
|
|
from enum import Enum |
|
|
import json |
|
|
from datetime import datetime |
|
|
import logging |
|
|
import matplotlib.pyplot as plt |
|
|
from scipy.special import expit as sigmoid |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
class FrequencyBand(Enum): |
|
|
DELTA = 'delta' |
|
|
THETA = 'theta' |
|
|
ALPHA = 'alpha' |
|
|
BETA = 'beta' |
|
|
GAMMA = 'gamma' |
|
|
|
|
|
class StreamType(Enum): |
|
|
STREAM_A = "Stream A: Hypo-coherence" |
|
|
STREAM_B = "Stream B: Hyper-coherence" |
|
|
DUAL = "Dual Stream" |
|
|
|
|
|
class SeamType(Enum): |
|
|
TYPE_I = "Type I: Perfect Recovery" |
|
|
TYPE_II = "Type II: Acceptable Loss" |
|
|
TYPE_III = "Type III: Failed Recovery" |
|
|
|
|
|
class SystemMode(Enum): |
|
|
STANDARD = "standard" |
|
|
HIGH_SENSITIVITY = "high_sensitivity" |
|
|
STABILITY = "stability" |
|
|
RECOVERY = "recovery" |
|
|
ADAPTIVE = "adaptive" |
|
|
|
|
|
class ChainState(Enum): |
|
|
HYPO = "hypo-coherent" |
|
|
HYPER = "hyper-coherent" |
|
|
INTACT = "intact" |
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
class SpatialPosition: |
|
|
x: float |
|
|
y: float |
|
|
m: int |
|
|
n: int |
|
|
|
|
|
def distance_to(self, other: 'SpatialPosition') -> float: |
|
|
return np.sqrt((self.x - other.x)**2 + (self.y - other.y)**2) |
|
|
|
|
|
def radius(self) -> float: |
|
|
return np.sqrt(self.x**2 + self.y**2) |
|
|
|
|
|
def angle(self) -> float: |
|
|
return np.arctan2(self.y, self.x) |
|
|
|
|
|
@dataclass |
|
|
class ChainComponent: |
|
|
band: FrequencyBand |
|
|
positions: List[SpatialPosition] |
|
|
coherence: float |
|
|
phase_std: float |
|
|
state: ChainState |
|
|
stream: StreamType |
|
|
|
|
|
@dataclass |
|
|
class DualAuditResult: |
|
|
|
|
|
delta_kappa_A: float |
|
|
s_A: float |
|
|
|
|
|
|
|
|
delta_kappa_B: float |
|
|
s_B: float |
|
|
|
|
|
|
|
|
s_composite: float |
|
|
tau_R: float |
|
|
D_C: float |
|
|
D_omega: float |
|
|
R: float |
|
|
I: float |
|
|
|
|
|
|
|
|
seam_type: SeamType |
|
|
audit_pass: bool |
|
|
active_streams: List[StreamType] |
|
|
details: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
@dataclass |
|
|
class AdaptiveThresholds: |
|
|
tau_low: float |
|
|
tau_high: float |
|
|
tau_phase: float |
|
|
alpha: float |
|
|
stress: float |
|
|
mode: SystemMode |
|
|
|
|
|
|
|
|
|
|
|
class ABCRConfig: |
|
|
"""Adaptive Bi-Coupled Coherence Recovery Configuration""" |
|
|
|
|
|
|
|
|
SPATIAL_GRID_M = 8 |
|
|
SPATIAL_GRID_N = 8 |
|
|
SPATIAL_UNIT = 0.1 |
|
|
PROPAGATION_SPEED = 1.0 |
|
|
|
|
|
|
|
|
TAU_BASE = 0.3 |
|
|
TAU_PHASE = 0.5 |
|
|
|
|
|
|
|
|
MODE_PARAMS = { |
|
|
SystemMode.STANDARD: { |
|
|
'alpha_base': 0.6, |
|
|
'alpha_mod': 0.1, |
|
|
'rho': 0.7, |
|
|
'novelty': 0.1, |
|
|
'baseline': 0.6 |
|
|
}, |
|
|
SystemMode.HIGH_SENSITIVITY: { |
|
|
'alpha_base': 0.65, |
|
|
'alpha_mod': 0.15, |
|
|
'rho': 0.6, |
|
|
'novelty': 0.12, |
|
|
'baseline': 0.65, |
|
|
'tau_low_factor': 0.8, |
|
|
'tau_high_factor': 1.2 |
|
|
}, |
|
|
SystemMode.STABILITY: { |
|
|
'alpha_base': 0.5, |
|
|
'alpha_mod': 0.05, |
|
|
'rho': 0.8, |
|
|
'novelty': 0.05, |
|
|
'baseline': 0.5 |
|
|
}, |
|
|
SystemMode.RECOVERY: { |
|
|
'alpha_base': 0.65, |
|
|
'alpha_mod': 0.15, |
|
|
'rho': 0.6, |
|
|
'novelty': 0.15, |
|
|
'baseline': 0.7 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
LAMBDA_CROSS_STREAM = 0.3 |
|
|
|
|
|
|
|
|
AUDIT_TOLERANCE = 0.01 |
|
|
TYPE_I_THRESHOLD = 1e-6 |
|
|
|
|
|
|
|
|
EMERGENCY_HYPO_THRESHOLD = 0.1 |
|
|
EMERGENCY_HYPER_THRESHOLD = 0.9 |
|
|
|
|
|
|
|
|
MAX_RECONSTRUCTION_ITERATIONS = 100 |
|
|
CONVERGENCE_TOLERANCE = 0.001 |
|
|
|
|
|
|
|
|
BAND_FREQUENCIES = { |
|
|
FrequencyBand.DELTA: 2.0, |
|
|
FrequencyBand.THETA: 6.0, |
|
|
FrequencyBand.ALPHA: 10.0, |
|
|
FrequencyBand.BETA: 20.0, |
|
|
FrequencyBand.GAMMA: 40.0 |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
class DualStreamEncoder: |
|
|
"""Encodes states into forward and mirror capsules for bi-directional processing""" |
|
|
|
|
|
def __init__(self, M: int = ABCRConfig.SPATIAL_GRID_M, |
|
|
N: int = ABCRConfig.SPATIAL_GRID_N): |
|
|
self.M = M |
|
|
self.N = N |
|
|
self.positions = self._initialize_positions() |
|
|
self.spatial_cache = {} |
|
|
|
|
|
def _initialize_positions(self) -> List[SpatialPosition]: |
|
|
positions = [] |
|
|
for m in range(-self.M, self.M + 1): |
|
|
for n in range(-self.N, self.N + 1): |
|
|
x = m * ABCRConfig.SPATIAL_UNIT |
|
|
y = n * ABCRConfig.SPATIAL_UNIT |
|
|
positions.append(SpatialPosition(x, y, m, n)) |
|
|
return positions |
|
|
|
|
|
def encode_forward_capsule(self, kappa_bands: Dict[FrequencyBand, float], |
|
|
phi_bands: Dict[FrequencyBand, float]) -> np.ndarray: |
|
|
"""Standard forward encoding: C_F[m,n,b] = G_r * κ_b * exp(iφ_b - ik_b*r)""" |
|
|
|
|
|
num_bands = len(FrequencyBand) |
|
|
capsule = np.zeros((2*self.M+1, 2*self.N+1, num_bands), dtype=complex) |
|
|
|
|
|
for pos in self.positions: |
|
|
r = pos.radius() |
|
|
G_r = np.exp(-r / (self.M * ABCRConfig.SPATIAL_UNIT)) |
|
|
|
|
|
for b_idx, band in enumerate(FrequencyBand): |
|
|
omega_b = ABCRConfig.BAND_FREQUENCIES[band] |
|
|
kappa_b = kappa_bands[band] |
|
|
phi_b = phi_bands[band] |
|
|
|
|
|
k_b = 2 * np.pi * omega_b / ABCRConfig.PROPAGATION_SPEED |
|
|
phase_shift = k_b * r |
|
|
total_phase = phi_b - phase_shift |
|
|
|
|
|
capsule[pos.m + self.M, pos.n + self.N, b_idx] = \ |
|
|
G_r * kappa_b * np.exp(1j * total_phase) |
|
|
|
|
|
return capsule |
|
|
|
|
|
def encode_mirror_capsule(self, kappa_bands: Dict[FrequencyBand, float], |
|
|
phi_bands: Dict[FrequencyBand, float]) -> np.ndarray: |
|
|
"""Mirror encoding: C_M[m,n,b] = G_r * (1-κ_b) * exp(i(π-φ_b) + ik_b*r)""" |
|
|
|
|
|
num_bands = len(FrequencyBand) |
|
|
capsule = np.zeros((2*self.M+1, 2*self.N+1, num_bands), dtype=complex) |
|
|
|
|
|
for pos in self.positions: |
|
|
r = pos.radius() |
|
|
G_r = np.exp(-r / (self.M * ABCRConfig.SPATIAL_UNIT)) |
|
|
|
|
|
for b_idx, band in enumerate(FrequencyBand): |
|
|
omega_b = ABCRConfig.BAND_FREQUENCIES[band] |
|
|
kappa_b = kappa_bands[band] |
|
|
phi_b = phi_bands[band] |
|
|
|
|
|
k_b = 2 * np.pi * omega_b / ABCRConfig.PROPAGATION_SPEED |
|
|
phase_shift = k_b * r |
|
|
total_phase = (np.pi - phi_b) + phase_shift |
|
|
|
|
|
capsule[pos.m + self.M, pos.n + self.N, b_idx] = \ |
|
|
G_r * (1 - kappa_b) * np.exp(1j * total_phase) |
|
|
|
|
|
return capsule |
|
|
|
|
|
def compute_spatial_coupling(self, pos1: SpatialPosition, pos2: SpatialPosition, |
|
|
band1: FrequencyBand, band2: FrequencyBand) -> float: |
|
|
"""Compute spatial coupling strength between positions and bands""" |
|
|
|
|
|
cache_key = (pos1.m, pos1.n, pos2.m, pos2.n, band1.value, band2.value) |
|
|
if cache_key in self.spatial_cache: |
|
|
return self.spatial_cache[cache_key] |
|
|
|
|
|
distance = pos1.distance_to(pos2) |
|
|
spatial_factor = np.exp(-distance / ABCRConfig.SPATIAL_UNIT) |
|
|
|
|
|
freq_diff = abs(ABCRConfig.BAND_FREQUENCIES[band1] - |
|
|
ABCRConfig.BAND_FREQUENCIES[band2]) |
|
|
freq_factor = np.exp(-freq_diff / 10.0) |
|
|
|
|
|
coupling = spatial_factor * freq_factor |
|
|
self.spatial_cache[cache_key] = coupling |
|
|
return coupling |
|
|
|
|
|
class AdaptiveThresholdManager: |
|
|
"""Manages adaptive thresholds based on system stress and mode""" |
|
|
|
|
|
def compute_system_stress(self, kappa: Dict[FrequencyBand, float], |
|
|
kappa_history: List[Dict[FrequencyBand, float]] = None) -> float: |
|
|
"""Compute system stress indicator σ(t)""" |
|
|
|
|
|
if kappa_history and len(kappa_history) > 0: |
|
|
|
|
|
prev_kappa = kappa_history[-1] |
|
|
dk_dt = {b: abs(kappa[b] - prev_kappa[b]) for b in FrequencyBand} |
|
|
numerator = sum(dk_dt.values()) |
|
|
denominator = sum(kappa.values()) + 1e-10 |
|
|
stress = min(1.0, numerator / denominator) |
|
|
else: |
|
|
|
|
|
balanced = 0.5 |
|
|
deviations = [abs(k - balanced) for k in kappa.values()] |
|
|
stress = np.mean(deviations) * 2 |
|
|
|
|
|
return np.clip(stress, 0, 1) |
|
|
|
|
|
def compute_adaptive_thresholds(self, stress: float, mode: SystemMode) -> AdaptiveThresholds: |
|
|
"""Compute adaptive thresholds based on stress and mode""" |
|
|
|
|
|
params = ABCRConfig.MODE_PARAMS[mode] |
|
|
|
|
|
|
|
|
tau_base = ABCRConfig.TAU_BASE |
|
|
|
|
|
|
|
|
tau_low = tau_base * (1 - stress * 0.3) |
|
|
tau_high = 1 - tau_base * (1 - stress * 0.3) |
|
|
|
|
|
|
|
|
if mode == SystemMode.HIGH_SENSITIVITY: |
|
|
tau_low *= params.get('tau_low_factor', 0.8) |
|
|
tau_high *= params.get('tau_high_factor', 1.2) |
|
|
elif mode == SystemMode.STABILITY: |
|
|
tau_low *= 1.1 |
|
|
tau_high *= 0.9 |
|
|
|
|
|
|
|
|
alpha = params['alpha_base'] + params['alpha_mod'] * stress |
|
|
|
|
|
|
|
|
tau_phase = ABCRConfig.TAU_PHASE * (1 + stress * 0.1) |
|
|
|
|
|
return AdaptiveThresholds( |
|
|
tau_low=np.clip(tau_low, 0.1, 0.5), |
|
|
tau_high=np.clip(tau_high, 0.5, 0.9), |
|
|
tau_phase=tau_phase, |
|
|
alpha=np.clip(alpha, 0.3, 0.8), |
|
|
stress=stress, |
|
|
mode=mode |
|
|
) |
|
|
|
|
|
class DualStreamProcessor: |
|
|
"""Processes both hypo and hyper coherence streams""" |
|
|
|
|
|
def __init__(self, encoder: DualStreamEncoder): |
|
|
self.encoder = encoder |
|
|
self.embedding_cache = {} |
|
|
|
|
|
def detect_broken_chains(self, kappa: Dict[FrequencyBand, float], |
|
|
C_F: np.ndarray, C_M: np.ndarray, |
|
|
thresholds: AdaptiveThresholds) -> Tuple[List[ChainComponent], |
|
|
List[ChainComponent], |
|
|
Dict[FrequencyBand, float]]: |
|
|
"""Detect broken chains in both streams""" |
|
|
|
|
|
broken_A = [] |
|
|
broken_B = [] |
|
|
intact = {} |
|
|
|
|
|
for b_idx, band in enumerate(FrequencyBand): |
|
|
kappa_b = kappa[band] |
|
|
|
|
|
if kappa_b < thresholds.tau_low: |
|
|
|
|
|
phase_coh = self._compute_phase_coherence(C_F, band, b_idx) |
|
|
|
|
|
if phase_coh < thresholds.tau_phase: |
|
|
component = ChainComponent( |
|
|
band=band, |
|
|
positions=self._get_significant_positions(C_F, b_idx), |
|
|
coherence=kappa_b, |
|
|
phase_std=np.sqrt(1 - phase_coh), |
|
|
state=ChainState.HYPO, |
|
|
stream=StreamType.STREAM_A |
|
|
) |
|
|
broken_A.append(component) |
|
|
logger.info(f"Stream A - Hypo-coherent chain: {band.value} " |
|
|
f"(κ={kappa_b:.3f}, phase_coh={phase_coh:.3f})") |
|
|
else: |
|
|
intact[band] = kappa_b |
|
|
|
|
|
elif kappa_b > thresholds.tau_high: |
|
|
|
|
|
phase_coh = self._compute_phase_coherence(C_M, band, b_idx) |
|
|
|
|
|
if phase_coh < thresholds.tau_phase: |
|
|
component = ChainComponent( |
|
|
band=band, |
|
|
positions=self._get_significant_positions(C_M, b_idx), |
|
|
coherence=kappa_b, |
|
|
phase_std=np.sqrt(1 - phase_coh), |
|
|
state=ChainState.HYPER, |
|
|
stream=StreamType.STREAM_B |
|
|
) |
|
|
broken_B.append(component) |
|
|
logger.info(f"Stream B - Hyper-coherent chain: {band.value} " |
|
|
f"(κ={kappa_b:.3f}, phase_coh={phase_coh:.3f})") |
|
|
else: |
|
|
intact[band] = kappa_b |
|
|
|
|
|
else: |
|
|
intact[band] = kappa_b |
|
|
|
|
|
return broken_A, broken_B, intact |
|
|
|
|
|
def _compute_phase_coherence(self, capsule: np.ndarray, band: FrequencyBand, |
|
|
b_idx: int) -> float: |
|
|
"""Compute phase coherence for a band""" |
|
|
|
|
|
phases = [] |
|
|
for pos in self.encoder.positions: |
|
|
value = capsule[pos.m + self.encoder.M, pos.n + self.encoder.N, b_idx] |
|
|
if np.abs(value) > 1e-6: |
|
|
phases.append(np.angle(value)) |
|
|
|
|
|
if len(phases) < 2: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
mean_vector = np.mean(np.exp(1j * np.array(phases))) |
|
|
coherence = np.abs(mean_vector) |
|
|
return coherence |
|
|
|
|
|
def _get_significant_positions(self, capsule: np.ndarray, b_idx: int, |
|
|
threshold: float = 1e-3) -> List[SpatialPosition]: |
|
|
"""Get positions with significant amplitude""" |
|
|
|
|
|
positions = [] |
|
|
for pos in self.encoder.positions: |
|
|
amplitude = np.abs(capsule[pos.m + self.encoder.M, pos.n + self.encoder.N, b_idx]) |
|
|
if amplitude > threshold: |
|
|
positions.append(pos) |
|
|
return positions |
|
|
|
|
|
class BiCoupledReconstructor: |
|
|
"""Reconstructs coherence using bi-coupled Hamiltonians""" |
|
|
|
|
|
def __init__(self, encoder: DualStreamEncoder): |
|
|
self.encoder = encoder |
|
|
self.H_A = {} |
|
|
self.H_B = {} |
|
|
|
|
|
def compute_hamiltonians(self, broken_A: List[ChainComponent], |
|
|
broken_B: List[ChainComponent], |
|
|
intact: Dict[FrequencyBand, float], |
|
|
C_F: np.ndarray, C_M: np.ndarray): |
|
|
"""Compute bi-coupled Hamiltonians for both streams""" |
|
|
|
|
|
|
|
|
for component in broken_A: |
|
|
band = component.band |
|
|
b_idx = list(FrequencyBand).index(band) |
|
|
|
|
|
|
|
|
h_F = 0 |
|
|
for pos in component.positions: |
|
|
h_F += C_F[pos.m + self.encoder.M, pos.n + self.encoder.N, b_idx] |
|
|
|
|
|
|
|
|
h_M = 0 |
|
|
for pos in component.positions: |
|
|
h_M += C_M[pos.m + self.encoder.M, pos.n + self.encoder.N, b_idx] |
|
|
|
|
|
|
|
|
J_coupling = 0 |
|
|
for intact_band, intact_val in intact.items(): |
|
|
for pos1 in component.positions: |
|
|
for pos2 in self.encoder.positions: |
|
|
coupling = self.encoder.compute_spatial_coupling( |
|
|
pos1, pos2, band, intact_band |
|
|
) |
|
|
J_coupling += coupling * intact_val |
|
|
|
|
|
|
|
|
self.H_A[band] = h_F + J_coupling + ABCRConfig.LAMBDA_CROSS_STREAM * h_M |
|
|
|
|
|
|
|
|
for component in broken_B: |
|
|
band = component.band |
|
|
b_idx = list(FrequencyBand).index(band) |
|
|
|
|
|
|
|
|
h_M = 0 |
|
|
for pos in component.positions: |
|
|
h_M += C_M[pos.m + self.encoder.M, pos.n + self.encoder.N, b_idx] |
|
|
|
|
|
|
|
|
h_F = 0 |
|
|
for pos in component.positions: |
|
|
h_F += C_F[pos.m + self.encoder.M, pos.n + self.encoder.N, b_idx] |
|
|
|
|
|
|
|
|
J_coupling = 0 |
|
|
for intact_band, intact_val in intact.items(): |
|
|
for pos1 in component.positions: |
|
|
for pos2 in self.encoder.positions: |
|
|
coupling = self.encoder.compute_spatial_coupling( |
|
|
pos1, pos2, band, intact_band |
|
|
) |
|
|
J_coupling += coupling * (1 - intact_val) |
|
|
|
|
|
|
|
|
self.H_B[band] = h_M + J_coupling + ABCRConfig.LAMBDA_CROSS_STREAM * h_F |
|
|
|
|
|
def reconstruct(self, broken_A: List[ChainComponent], |
|
|
broken_B: List[ChainComponent], |
|
|
intact: Dict[FrequencyBand, float]) -> Dict[FrequencyBand, float]: |
|
|
"""Iteratively reconstruct broken chains using bi-coupled fields""" |
|
|
|
|
|
kappa_recon = intact.copy() |
|
|
|
|
|
|
|
|
for component in broken_A: |
|
|
kappa_recon[component.band] = 0.3 |
|
|
for component in broken_B: |
|
|
kappa_recon[component.band] = 0.7 |
|
|
|
|
|
|
|
|
for iteration in range(ABCRConfig.MAX_RECONSTRUCTION_ITERATIONS): |
|
|
converged = True |
|
|
|
|
|
|
|
|
for component in broken_A: |
|
|
band = component.band |
|
|
field = np.abs(self.H_A.get(band, 0)) |
|
|
|
|
|
|
|
|
kappa_new = sigmoid(field) |
|
|
|
|
|
if abs(kappa_new - kappa_recon[band]) > ABCRConfig.CONVERGENCE_TOLERANCE: |
|
|
converged = False |
|
|
|
|
|
kappa_recon[band] = kappa_new |
|
|
|
|
|
|
|
|
for component in broken_B: |
|
|
band = component.band |
|
|
field = np.abs(self.H_B.get(band, 0)) |
|
|
|
|
|
|
|
|
kappa_new = 1 - sigmoid(field) |
|
|
|
|
|
if abs(kappa_new - kappa_recon[band]) > ABCRConfig.CONVERGENCE_TOLERANCE: |
|
|
converged = False |
|
|
|
|
|
kappa_recon[band] = kappa_new |
|
|
|
|
|
if converged: |
|
|
logger.info(f"Reconstruction converged in {iteration+1} iterations") |
|
|
break |
|
|
|
|
|
return kappa_recon |
|
|
|
|
|
class DualStreamAuditor: |
|
|
"""Performs integrity audit across both processing streams""" |
|
|
|
|
|
def audit(self, kappa_original: Dict[FrequencyBand, float], |
|
|
kappa_recon: Dict[FrequencyBand, float], |
|
|
broken_A: List[ChainComponent], |
|
|
broken_B: List[ChainComponent], |
|
|
timestamp_original: float, |
|
|
timestamp_recon: float) -> DualAuditResult: |
|
|
"""Perform dual-stream integrity audit""" |
|
|
|
|
|
|
|
|
delta_kappa_A = self._compute_stream_delta(kappa_original, kappa_recon, broken_A) |
|
|
delta_kappa_B = self._compute_stream_delta(kappa_original, kappa_recon, broken_B) |
|
|
|
|
|
|
|
|
tau_R = abs(timestamp_recon - timestamp_original) |
|
|
D_C = self._compute_curvature_change(kappa_original, kappa_recon) |
|
|
D_omega = self._compute_entropy_drift(kappa_original, kappa_recon) |
|
|
R = self._compute_return_credit(kappa_original, kappa_recon) |
|
|
|
|
|
|
|
|
s_A = R * tau_R - (delta_kappa_A + D_omega + D_C) if broken_A else 0 |
|
|
s_B = R * tau_R - (delta_kappa_B + D_omega + D_C) if broken_B else 0 |
|
|
|
|
|
|
|
|
active_streams = [] |
|
|
if broken_A: |
|
|
active_streams.append(StreamType.STREAM_A) |
|
|
if broken_B: |
|
|
active_streams.append(StreamType.STREAM_B) |
|
|
|
|
|
if broken_A and broken_B: |
|
|
w_A = len(broken_A) / (len(broken_A) + len(broken_B)) |
|
|
w_B = len(broken_B) / (len(broken_A) + len(broken_B)) |
|
|
s_composite = w_A * s_A + w_B * s_B |
|
|
elif broken_A: |
|
|
s_composite = s_A |
|
|
elif broken_B: |
|
|
s_composite = s_B |
|
|
else: |
|
|
s_composite = 0 |
|
|
|
|
|
|
|
|
delta_kappa_avg = np.mean([kappa_recon[b] - kappa_original[b] |
|
|
for b in FrequencyBand]) |
|
|
|
|
|
if abs(s_composite) < ABCRConfig.AUDIT_TOLERANCE: |
|
|
if abs(delta_kappa_avg) < ABCRConfig.TYPE_I_THRESHOLD: |
|
|
seam_type = SeamType.TYPE_I |
|
|
else: |
|
|
seam_type = SeamType.TYPE_II |
|
|
audit_pass = True |
|
|
else: |
|
|
seam_type = SeamType.TYPE_III |
|
|
audit_pass = False |
|
|
|
|
|
|
|
|
kappa_final = np.mean(list(kappa_recon.values())) |
|
|
I = np.exp(kappa_final) |
|
|
|
|
|
result = DualAuditResult( |
|
|
delta_kappa_A=delta_kappa_A, |
|
|
s_A=s_A, |
|
|
delta_kappa_B=delta_kappa_B, |
|
|
s_B=s_B, |
|
|
s_composite=s_composite, |
|
|
tau_R=tau_R, |
|
|
D_C=D_C, |
|
|
D_omega=D_omega, |
|
|
R=R, |
|
|
I=I, |
|
|
seam_type=seam_type, |
|
|
audit_pass=audit_pass, |
|
|
active_streams=active_streams, |
|
|
details={ |
|
|
'broken_A_count': len(broken_A), |
|
|
'broken_B_count': len(broken_B), |
|
|
'delta_kappa_avg': delta_kappa_avg |
|
|
} |
|
|
) |
|
|
|
|
|
logger.info(f"Dual-stream audit: {seam_type.value}, " |
|
|
f"s_composite={s_composite:.6f}, pass={audit_pass}") |
|
|
|
|
|
return result |
|
|
|
|
|
def _compute_stream_delta(self, original: Dict[FrequencyBand, float], |
|
|
recon: Dict[FrequencyBand, float], |
|
|
broken: List[ChainComponent]) -> float: |
|
|
"""Compute average delta for a specific stream""" |
|
|
if not broken: |
|
|
return 0 |
|
|
|
|
|
deltas = [recon[c.band] - original[c.band] for c in broken] |
|
|
return np.mean(deltas) if deltas else 0 |
|
|
|
|
|
def _compute_curvature_change(self, original: Dict[FrequencyBand, float], |
|
|
recon: Dict[FrequencyBand, float]) -> float: |
|
|
"""Compute change in spectral curvature""" |
|
|
original_vals = np.array(list(original.values())) |
|
|
recon_vals = np.array(list(recon.values())) |
|
|
|
|
|
if len(original_vals) >= 3: |
|
|
original_curvature = np.mean(np.abs(np.diff(original_vals, n=2))) |
|
|
recon_curvature = np.mean(np.abs(np.diff(recon_vals, n=2))) |
|
|
return abs(recon_curvature - original_curvature) |
|
|
|
|
|
return 0.0 |
|
|
|
|
|
def _compute_entropy_drift(self, original: Dict[FrequencyBand, float], |
|
|
recon: Dict[FrequencyBand, float]) -> float: |
|
|
"""Compute entropy-based drift metric""" |
|
|
errors = [recon[b] - original[b] for b in FrequencyBand] |
|
|
return np.std(errors) |
|
|
|
|
|
def _compute_return_credit(self, original: Dict[FrequencyBand, float], |
|
|
recon: Dict[FrequencyBand, float]) -> float: |
|
|
"""Compute return credit based on recovery quality""" |
|
|
ratios = [] |
|
|
for band in FrequencyBand: |
|
|
if original[band] > 0: |
|
|
ratio = recon[band] / original[band] |
|
|
ratio = np.clip(ratio, 0, 2) |
|
|
ratios.append(1 - abs(1 - ratio)) |
|
|
|
|
|
return np.mean(ratios) if ratios else 0.0 |
|
|
|
|
|
class AdaptiveRenewalEngine: |
|
|
"""Manages cognitive renewal with mode-dependent strategies""" |
|
|
|
|
|
def __init__(self): |
|
|
self.Pi = None |
|
|
self.renewal_history = [] |
|
|
self.release_history = [] |
|
|
|
|
|
def initialize_invariant_field(self, kappa_initial: Dict[FrequencyBand, float]): |
|
|
"""Initialize the invariant field Pi""" |
|
|
self.Pi = kappa_initial.copy() |
|
|
logger.info(f"Invariant field initialized: mean κ = {np.mean(list(self.Pi.values())):.3f}") |
|
|
|
|
|
def update_invariant_field(self, kappa_current: Dict[FrequencyBand, float], |
|
|
beta: float = 0.1): |
|
|
"""Update invariant field with exponential moving average""" |
|
|
if self.Pi is None: |
|
|
self.initialize_invariant_field(kappa_current) |
|
|
return |
|
|
|
|
|
for band in FrequencyBand: |
|
|
self.Pi[band] = (1 - beta) * self.Pi[band] + beta * kappa_current[band] |
|
|
|
|
|
def perform_renewal(self, kappa_fragmented: Dict[FrequencyBand, float], |
|
|
mode: SystemMode) -> Dict[FrequencyBand, float]: |
|
|
"""Perform adaptive renewal based on mode""" |
|
|
|
|
|
if self.Pi is None: |
|
|
logger.warning("Cannot renew: Pi not initialized") |
|
|
return kappa_fragmented |
|
|
|
|
|
params = ABCRConfig.MODE_PARAMS[mode] |
|
|
rho = params['rho'] |
|
|
novelty = params['novelty'] |
|
|
baseline = params['baseline'] |
|
|
|
|
|
kappa_renewed = {} |
|
|
|
|
|
for band in FrequencyBand: |
|
|
xi = np.random.normal(0, novelty) |
|
|
|
|
|
kappa_renewed[band] = ( |
|
|
rho * self.Pi[band] + |
|
|
(1 - rho) * baseline + |
|
|
xi |
|
|
) |
|
|
kappa_renewed[band] = np.clip(kappa_renewed[band], 0, 1) |
|
|
|
|
|
self.renewal_history.append({ |
|
|
'timestamp': datetime.now().isoformat(), |
|
|
'mode': mode.value, |
|
|
'kappa_before': kappa_fragmented.copy(), |
|
|
'kappa_after': kappa_renewed.copy(), |
|
|
'Pi_state': self.Pi.copy() |
|
|
}) |
|
|
|
|
|
logger.info(f"Renewal ({mode.value}): mean κ before={np.mean(list(kappa_fragmented.values())):.3f}, " |
|
|
f"after={np.mean(list(kappa_renewed.values())):.3f}") |
|
|
|
|
|
return kappa_renewed |
|
|
|
|
|
|
|
|
|
|
|
class AdaptiveBiCoupledCoherenceSystem: |
|
|
"""Main ABCR system orchestrating all components""" |
|
|
|
|
|
def __init__(self, mode: SystemMode = SystemMode.STANDARD): |
|
|
self.mode = mode |
|
|
self.encoder = DualStreamEncoder() |
|
|
self.threshold_manager = AdaptiveThresholdManager() |
|
|
self.processor = DualStreamProcessor(self.encoder) |
|
|
self.reconstructor = BiCoupledReconstructor(self.encoder) |
|
|
self.auditor = DualStreamAuditor() |
|
|
self.renewal_engine = AdaptiveRenewalEngine() |
|
|
|
|
|
self.current_capsules = {'forward': None, 'mirror': None} |
|
|
self.kappa_history = [] |
|
|
self.system_history = [] |
|
|
|
|
|
logger.info(f"ABCR System initialized in {mode.value} mode") |
|
|
|
|
|
def process_coherence_state(self, kappa: Dict[FrequencyBand, float], |
|
|
phi: Dict[FrequencyBand, float], |
|
|
timestamp: float) -> Optional[Dict[FrequencyBand, float]]: |
|
|
"""Main processing pipeline""" |
|
|
|
|
|
|
|
|
stress = self.threshold_manager.compute_system_stress(kappa, self.kappa_history) |
|
|
thresholds = self.threshold_manager.compute_adaptive_thresholds(stress, self.mode) |
|
|
|
|
|
logger.info(f"System stress: {stress:.3f}, tau_low={thresholds.tau_low:.3f}, " |
|
|
f"tau_high={thresholds.tau_high:.3f}, alpha={thresholds.alpha:.3f}") |
|
|
|
|
|
|
|
|
C_F = self.encoder.encode_forward_capsule(kappa, phi) |
|
|
C_M = self.encoder.encode_mirror_capsule(kappa, phi) |
|
|
self.current_capsules = {'forward': C_F, 'mirror': C_M} |
|
|
|
|
|
|
|
|
broken_A, broken_B, intact = self.processor.detect_broken_chains( |
|
|
kappa, C_F, C_M, thresholds |
|
|
) |
|
|
|
|
|
|
|
|
if self._check_emergency_conditions(kappa, broken_A, broken_B): |
|
|
logger.critical("EMERGENCY DECOUPLE TRIGGERED") |
|
|
return None |
|
|
|
|
|
|
|
|
if not broken_A and not broken_B: |
|
|
logger.info("No broken chains detected - state stable") |
|
|
self.kappa_history.append(kappa.copy()) |
|
|
return kappa |
|
|
|
|
|
logger.info("=" * 70) |
|
|
logger.info(f"DUAL-STREAM RECOVERY: {len(broken_A)} hypo, {len(broken_B)} hyper chains") |
|
|
logger.info("=" * 70) |
|
|
|
|
|
|
|
|
self.reconstructor.compute_hamiltonians(broken_A, broken_B, intact, C_F, C_M) |
|
|
|
|
|
|
|
|
kappa_recon = self.reconstructor.reconstruct(broken_A, broken_B, intact) |
|
|
|
|
|
|
|
|
audit_result = self.auditor.audit( |
|
|
kappa, kappa_recon, broken_A, broken_B, |
|
|
timestamp - 0.1 if self.kappa_history else timestamp, |
|
|
timestamp |
|
|
) |
|
|
|
|
|
|
|
|
if audit_result.audit_pass: |
|
|
kappa_final = self.renewal_engine.perform_renewal(kappa_recon, self.mode) |
|
|
self.renewal_engine.update_invariant_field(kappa_final) |
|
|
|
|
|
self._record_event('successful_recovery', timestamp, kappa, kappa_final, audit_result) |
|
|
self.kappa_history.append(kappa_final.copy()) |
|
|
|
|
|
logger.info(f"✓ Recovery successful: {audit_result.seam_type.value}") |
|
|
logger.info("=" * 70) |
|
|
return kappa_final |
|
|
else: |
|
|
logger.error(f"✗ Audit failed: {audit_result.seam_type.value}") |
|
|
self._record_event('failed_recovery', timestamp, kappa, kappa_recon, audit_result) |
|
|
logger.info("=" * 70) |
|
|
|
|
|
|
|
|
return self._fallback_recovery(kappa) |
|
|
|
|
|
def _check_emergency_conditions(self, kappa: Dict[FrequencyBand, float], |
|
|
broken_A: List[ChainComponent], |
|
|
broken_B: List[ChainComponent]) -> bool: |
|
|
"""Check for emergency decouple conditions""" |
|
|
|
|
|
min_kappa = min(kappa.values()) |
|
|
max_kappa = max(kappa.values()) |
|
|
|
|
|
if min_kappa < ABCRConfig.EMERGENCY_HYPO_THRESHOLD: |
|
|
logger.critical(f"Emergency: Extreme hypo-coherence (min={min_kappa:.3f})") |
|
|
return True |
|
|
|
|
|
if max_kappa > ABCRConfig.EMERGENCY_HYPER_THRESHOLD: |
|
|
logger.critical(f"Emergency: Extreme hyper-coherence (max={max_kappa:.3f})") |
|
|
return True |
|
|
|
|
|
if len(broken_A) + len(broken_B) >= len(FrequencyBand): |
|
|
logger.critical("Emergency: All bands broken") |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
def _fallback_recovery(self, kappa: Dict[FrequencyBand, float]) -> Dict[FrequencyBand, float]: |
|
|
"""Fallback recovery when audit fails""" |
|
|
|
|
|
if self.renewal_engine.Pi is not None: |
|
|
|
|
|
return self.renewal_engine.Pi.copy() |
|
|
else: |
|
|
|
|
|
return {band: 0.5 for band in FrequencyBand} |
|
|
|
|
|
def _record_event(self, event_type: str, timestamp: float, |
|
|
kappa_before: Dict[FrequencyBand, float], |
|
|
kappa_after: Dict[FrequencyBand, float], |
|
|
audit_result: DualAuditResult): |
|
|
"""Record system event for history""" |
|
|
|
|
|
self.system_history.append({ |
|
|
'timestamp': timestamp, |
|
|
'event': event_type, |
|
|
'kappa_before': kappa_before.copy(), |
|
|
'kappa_after': kappa_after.copy(), |
|
|
'audit': { |
|
|
'seam_type': audit_result.seam_type.value, |
|
|
's_composite': audit_result.s_composite, |
|
|
's_A': audit_result.s_A, |
|
|
's_B': audit_result.s_B, |
|
|
'active_streams': [s.value for s in audit_result.active_streams] |
|
|
} |
|
|
}) |
|
|
|
|
|
def set_mode(self, mode: SystemMode): |
|
|
"""Change operational mode""" |
|
|
self.mode = mode |
|
|
logger.info(f"System mode changed to: {mode.value}") |
|
|
|
|
|
def simulate_dynamics(self, duration: float = 10.0, dt: float = 0.1, |
|
|
scenario: str = "dual_stress") -> List[Dict]: |
|
|
"""Simulate coherence dynamics with various scenarios""" |
|
|
|
|
|
time_points = int(duration / dt) |
|
|
coherence_history = [] |
|
|
|
|
|
|
|
|
kappa_baseline = { |
|
|
FrequencyBand.DELTA: 0.72, |
|
|
FrequencyBand.THETA: 0.68, |
|
|
FrequencyBand.ALPHA: 0.75, |
|
|
FrequencyBand.BETA: 0.70, |
|
|
FrequencyBand.GAMMA: 0.65 |
|
|
} |
|
|
|
|
|
phi_baseline = { |
|
|
FrequencyBand.DELTA: 0.1, |
|
|
FrequencyBand.THETA: 0.3, |
|
|
FrequencyBand.ALPHA: 0.5, |
|
|
FrequencyBand.BETA: 0.7, |
|
|
FrequencyBand.GAMMA: 0.9 |
|
|
} |
|
|
|
|
|
current_kappa = kappa_baseline.copy() |
|
|
current_phi = phi_baseline.copy() |
|
|
|
|
|
|
|
|
self.renewal_engine.initialize_invariant_field(current_kappa) |
|
|
|
|
|
for i in range(time_points): |
|
|
t = i * dt |
|
|
|
|
|
|
|
|
if scenario == "dual_stress": |
|
|
if 2.0 <= t <= 4.0: |
|
|
|
|
|
current_kappa[FrequencyBand.DELTA] = 0.15 |
|
|
current_kappa[FrequencyBand.THETA] = 0.20 |
|
|
current_kappa[FrequencyBand.BETA] = 0.18 |
|
|
elif 5.0 <= t <= 7.0: |
|
|
|
|
|
current_kappa[FrequencyBand.ALPHA] = 0.92 |
|
|
current_kappa[FrequencyBand.GAMMA] = 0.88 |
|
|
current_kappa[FrequencyBand.BETA] = 0.85 |
|
|
|
|
|
elif scenario == "oscillatory": |
|
|
|
|
|
for band in FrequencyBand: |
|
|
freq = ABCRConfig.BAND_FREQUENCIES[band] |
|
|
current_kappa[band] = 0.5 + 0.4 * np.sin(2 * np.pi * freq * t / 20) |
|
|
|
|
|
elif scenario == "cascade": |
|
|
|
|
|
if t > 3.0: |
|
|
failed_bands = int((t - 3.0) / 1.5) |
|
|
for idx, band in enumerate(FrequencyBand): |
|
|
if idx < failed_bands: |
|
|
current_kappa[band] = 0.1 |
|
|
|
|
|
|
|
|
for band in FrequencyBand: |
|
|
current_kappa[band] += np.random.normal(0, 0.02) |
|
|
current_kappa[band] = np.clip(current_kappa[band], 0, 1) |
|
|
|
|
|
|
|
|
recovered = self.process_coherence_state(current_kappa, current_phi, t) |
|
|
|
|
|
if recovered is not None: |
|
|
current_kappa = recovered |
|
|
|
|
|
coherence_history.append({ |
|
|
'timestamp': t, |
|
|
'kappa_state': current_kappa.copy(), |
|
|
'recovered': recovered is not None |
|
|
}) |
|
|
|
|
|
return coherence_history |
|
|
|
|
|
def visualize_dynamics(self, coherence_history: List[Dict], save_path: str = None): |
|
|
"""Visualize dual-stream coherence dynamics""" |
|
|
|
|
|
timestamps = [entry['timestamp'] for entry in coherence_history] |
|
|
kappa_values = {band: [] for band in FrequencyBand} |
|
|
|
|
|
for entry in coherence_history: |
|
|
for band in FrequencyBand: |
|
|
kappa_values[band].append(entry['kappa_state'][band]) |
|
|
|
|
|
fig, axes = plt.subplots(3, 1, figsize=(14, 12)) |
|
|
|
|
|
colors = { |
|
|
FrequencyBand.DELTA: 'blue', |
|
|
FrequencyBand.THETA: 'green', |
|
|
FrequencyBand.ALPHA: 'red', |
|
|
FrequencyBand.BETA: 'orange', |
|
|
FrequencyBand.GAMMA: 'purple' |
|
|
} |
|
|
|
|
|
|
|
|
ax1 = axes[0] |
|
|
for band in FrequencyBand: |
|
|
ax1.plot(timestamps, kappa_values[band], label=band.value, |
|
|
color=colors[band], linewidth=2) |
|
|
|
|
|
|
|
|
ax1.axhspan(0, ABCRConfig.TAU_BASE, alpha=0.1, color='blue', |
|
|
label='Hypo-coherent zone') |
|
|
ax1.axhspan(1-ABCRConfig.TAU_BASE, 1, alpha=0.1, color='red', |
|
|
label='Hyper-coherent zone') |
|
|
ax1.axhline(y=0.5, color='gray', linestyle=':', alpha=0.5) |
|
|
|
|
|
ax1.set_ylabel('Coherence κ') |
|
|
ax1.set_title('Adaptive Bi-Coupled Coherence Recovery - Dual Stream Processing') |
|
|
ax1.legend(loc='upper right') |
|
|
ax1.grid(True, alpha=0.3) |
|
|
ax1.set_ylim(-0.05, 1.05) |
|
|
|
|
|
|
|
|
ax2 = axes[1] |
|
|
stream_A_activity = [] |
|
|
stream_B_activity = [] |
|
|
|
|
|
for event in self.system_history: |
|
|
if 'audit' in event: |
|
|
streams = event['audit']['active_streams'] |
|
|
t = event['timestamp'] |
|
|
if 'Stream A: Hypo-coherence' in streams: |
|
|
stream_A_activity.append(t) |
|
|
if 'Stream B: Hyper-coherence' in streams: |
|
|
stream_B_activity.append(t) |
|
|
|
|
|
if stream_A_activity: |
|
|
ax2.scatter(stream_A_activity, [0.3]*len(stream_A_activity), |
|
|
color='blue', s=50, alpha=0.7, label='Stream A (Hypo)') |
|
|
if stream_B_activity: |
|
|
ax2.scatter(stream_B_activity, [0.7]*len(stream_B_activity), |
|
|
color='red', s=50, alpha=0.7, label='Stream B (Hyper)') |
|
|
|
|
|
ax2.set_ylabel('Stream Activity') |
|
|
ax2.set_title('Dual-Stream Processing Activity') |
|
|
ax2.legend() |
|
|
ax2.grid(True, alpha=0.3) |
|
|
ax2.set_ylim(0, 1) |
|
|
|
|
|
|
|
|
ax3 = axes[2] |
|
|
audit_times = [] |
|
|
s_composite_vals = [] |
|
|
seam_types = [] |
|
|
|
|
|
for event in self.system_history: |
|
|
if 'audit' in event: |
|
|
audit_times.append(event['timestamp']) |
|
|
s_composite_vals.append(event['audit']['s_composite']) |
|
|
seam_types.append(event['audit']['seam_type']) |
|
|
|
|
|
if audit_times: |
|
|
|
|
|
seam_colors = [] |
|
|
for seam in seam_types: |
|
|
if 'Type I' in seam: |
|
|
seam_colors.append('green') |
|
|
elif 'Type II' in seam: |
|
|
seam_colors.append('orange') |
|
|
else: |
|
|
seam_colors.append('red') |
|
|
|
|
|
ax3.scatter(audit_times, s_composite_vals, c=seam_colors, s=30, alpha=0.7) |
|
|
ax3.axhline(y=0, color='gray', linestyle='-', alpha=0.5) |
|
|
ax3.axhline(y=ABCRConfig.AUDIT_TOLERANCE, color='green', |
|
|
linestyle='--', alpha=0.5, label='Audit threshold') |
|
|
ax3.axhline(y=-ABCRConfig.AUDIT_TOLERANCE, color='green', |
|
|
linestyle='--', alpha=0.5) |
|
|
|
|
|
ax3.set_xlabel('Time (s)') |
|
|
ax3.set_ylabel('Composite Residual') |
|
|
ax3.set_title('Dual-Stream Audit Results') |
|
|
ax3.legend() |
|
|
ax3.grid(True, alpha=0.3) |
|
|
|
|
|
plt.tight_layout() |
|
|
|
|
|
if save_path: |
|
|
plt.savefig(save_path, dpi=300, bbox_inches='tight') |
|
|
logger.info(f"Visualization saved to {save_path}") |
|
|
|
|
|
plt.show() |
|
|
|
|
|
def save_state(self, filepath: str): |
|
|
"""Save system state to file""" |
|
|
|
|
|
state = { |
|
|
'mode': self.mode.value, |
|
|
'invariant_field': self.renewal_engine.Pi, |
|
|
'system_history': self.system_history, |
|
|
'renewal_history': self.renewal_engine.renewal_history, |
|
|
'kappa_history': self.kappa_history[-10:] if self.kappa_history else [] |
|
|
} |
|
|
|
|
|
with open(filepath, 'w') as f: |
|
|
json.dump(state, f, indent=2, default=str) |
|
|
|
|
|
logger.info(f"System state saved to {filepath}") |
|
|
|
|
|
def load_state(self, filepath: str): |
|
|
"""Load system state from file""" |
|
|
|
|
|
with open(filepath, 'r') as f: |
|
|
state = json.load(f) |
|
|
|
|
|
self.mode = SystemMode(state['mode']) |
|
|
self.renewal_engine.Pi = state['invariant_field'] |
|
|
self.system_history = state['system_history'] |
|
|
self.renewal_engine.renewal_history = state['renewal_history'] |
|
|
self.kappa_history = state['kappa_history'] |
|
|
|
|
|
logger.info(f"System state loaded from {filepath}") |
|
|
|
|
|
|
|
|
|
|
|
def demonstrate_abcr_system(): |
|
|
"""Demonstrate the complete ABCR system with various scenarios""" |
|
|
|
|
|
print("=" * 70) |
|
|
print("ADAPTIVE BI-COUPLED COHERENCE RECOVERY (ABCR) SYSTEM") |
|
|
print("=" * 70) |
|
|
print() |
|
|
|
|
|
|
|
|
scenarios = [ |
|
|
("dual_stress", SystemMode.ADAPTIVE), |
|
|
("oscillatory", SystemMode.HIGH_SENSITIVITY), |
|
|
("cascade", SystemMode.RECOVERY) |
|
|
] |
|
|
|
|
|
for scenario_name, mode in scenarios: |
|
|
print(f"\nTesting scenario: {scenario_name} with {mode.value} mode") |
|
|
print("-" * 50) |
|
|
|
|
|
|
|
|
system = AdaptiveBiCoupledCoherenceSystem(mode=mode) |
|
|
|
|
|
|
|
|
history = system.simulate_dynamics(duration=10.0, dt=0.1, scenario=scenario_name) |
|
|
|
|
|
|
|
|
successful_recoveries = sum(1 for event in system.system_history |
|
|
if event['event'] == 'successful_recovery') |
|
|
total_events = len(system.system_history) |
|
|
|
|
|
print(f"Results for {scenario_name}:") |
|
|
print(f" Total events: {len(history)}") |
|
|
print(f" Recovery attempts: {total_events}") |
|
|
print(f" Successful recoveries: {successful_recoveries}") |
|
|
if total_events > 0: |
|
|
print(f" Success rate: {successful_recoveries/total_events*100:.1f}%") |
|
|
|
|
|
|
|
|
stream_A_count = sum(1 for event in system.system_history |
|
|
if 'audit' in event and |
|
|
'Stream A: Hypo-coherence' in event['audit']['active_streams']) |
|
|
stream_B_count = sum(1 for event in system.system_history |
|
|
if 'audit' in event and |
|
|
'Stream B: Hyper-coherence' in event['audit']['active_streams']) |
|
|
|
|
|
print(f" Stream A (hypo) activations: {stream_A_count}") |
|
|
print(f" Stream B (hyper) activations: {stream_B_count}") |
|
|
|
|
|
|
|
|
system.visualize_dynamics(history, f"abcr_{scenario_name}_{mode.value}.png") |
|
|
|
|
|
|
|
|
system.save_state(f"abcr_state_{scenario_name}_{mode.value}.json") |
|
|
|
|
|
print() |
|
|
print("=" * 70) |
|
|
print("ABCR system demonstration complete!") |
|
|
print("=" * 70) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demonstrate_abcr_system() |
|
|
|