Codette-Reasoning / reasoning_forge /coherence_field.py
Raiff1982's picture
Upload 120 files
ed1b365 verified
"""Coherence Field Gamma (Γ) — System Health Stabilization
Phase 5A Critical Infrastructure: Prevents three failure modes in closed-loop reasoning:
1. Weight Drift: Adapter weights concentrate; diversity collapses
2. False Convergence: System reduces conflict but converges on wrong answer
3. Feedback Lock-in: Early bad runs reinforce themselves via memory
Solution: Γ (Gamma) monitors system coherence field and injects stabilizers when
health drops below safe zones. Works alongside Phase 4 runaway detection.
Health Score:
γ ∈ [0, 1] where:
- γ < 0.4: System instability → inject diverse perspective
- 0.4 ≤ γ ≤ 0.8: Healthy zone (maintain status quo)
- γ > 0.8: Groupthink risk → force conflict pair to create productive tension
Components:
1. Conflict Distribution: Are conflicts well-distributed across perspectives?
2. Diversity Index: Are we using multiple perspectives or just 1-2 favorites?
3. Tension Health: Is ξ (epistemic tension) in productive zone [0.1, 0.4]?
4. Coherence Quality: Is coherence maintained while resolving conflicts?
"""
import time
import math
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from enum import Enum
class InterventionType(Enum):
"""Stabilization intervention types."""
DIVERSITY_INJECTION = "diversity_injection" # Inject unused perspective
CONFLICT_INJECTION = "conflict_injection" # Force conflict pair for productive tension
@dataclass
class GammaHealthMetrics:
"""Real-time system health snapshot."""
timestamp: float
avg_conflict_strength: float # Mean conflict strength [0, 1]
perspective_diversity: float # % unique perspectives used [0, 1]
resolution_rate: float # % conflicts resolved this round [0, 1]
adapter_weight_variance: float # Variance in adapter weights (0=equal, 1=concentrated)
epistemic_tension: float # ξ — productive conflict level [0, 1]
coherence_score: float # Ensemble coherence [0, 1]
gamma: float # Composite health score [0, 1]
def is_stable(self) -> bool:
"""Return True if system is in healthy zone."""
return 0.4 <= self.gamma <= 0.8
def is_collapsing(self) -> bool:
"""Return True if system shows instability signs."""
return self.gamma < 0.4
def is_groupthinking(self) -> bool:
"""Return True if system shows groupthink signs."""
return self.gamma > 0.8
@property
def status(self) -> str:
"""Return human-readable status string."""
if self.is_collapsing():
return "collapsing"
elif self.is_groupthinking():
return "groupthinking"
else:
return "stable"
@dataclass
class GammaIntervention:
"""Record of stabilization intervention taken."""
timestamp: float
intervention_type: InterventionType # Type-safe enum instead of string
reason: str # Why intervention was triggered
gamma_before: float # Health score before
recommended_adapter: Optional[str] = None # Which adapter to inject
result: Optional[str] = None # Outcome (filled in after execution)
class CoherenceFieldGamma:
"""Real-time system health monitor and stabilizer.
Tracks epistemic health and intervenes when system drifts toward:
- Monoculture (weight drift, diversity collapse)
- False convergence (low conflict, wrong answer)
- Instability (oscillating weights, conflicting signals)
"""
# Maximum history size before rolling window cleanup
MAX_HEALTH_HISTORY = 1000
MAX_INTERVENTION_LOG = 500
def __init__(self, memory_weighting=None, target_conflict_range: Tuple[float, float] = (0.1, 0.4)):
"""
Args:
memory_weighting: MemoryWeighting instance (for analyzing adapter weights)
target_conflict_range: Ideal epistemic tension zone [low, high]
"""
self.memory_weighting = memory_weighting
self.target_conflict_low, self.target_conflict_high = target_conflict_range
# Use deques with maxlen for bounded memory growth
from collections import deque
self.health_history: deque = deque(maxlen=self.MAX_HEALTH_HISTORY)
self.interventions: deque = deque(maxlen=self.MAX_INTERVENTION_LOG)
self.last_health_check = time.time()
def compute_health(self, conflicts: List, responses: Dict, adapter_weights: Optional[Dict] = None) -> GammaHealthMetrics:
"""Compute Γ (Gamma) health score from current debate state.
Args:
conflicts: List of active conflicts from current round
responses: Dict of {adapter_name: response_text} from debate
adapter_weights: Dict of {adapter_name: weight_float} from MemoryWeighting
Returns:
GammaHealthMetrics with computed gamma and health indicators
"""
# 1. CONFLICT DISTRIBUTION: Are conflicts well-distributed?
avg_conflict_strength = 0.0
conflict_by_adapter = {}
if conflicts:
for conflict in conflicts:
avg_conflict_strength += conflict.strength if hasattr(conflict, 'strength') else 0.5
# Track which adapters are in conflicts
if hasattr(conflict, 'agent_a'):
agent = conflict.agent_a.lower()
conflict_by_adapter[agent] = conflict_by_adapter.get(agent, 0) + 1
if hasattr(conflict, 'agent_b'):
agent = conflict.agent_b.lower()
conflict_by_adapter[agent] = conflict_by_adapter.get(agent, 0) + 1
avg_conflict_strength /= len(conflicts)
else:
avg_conflict_strength = 0.5 # Neutral if no conflicts
# 2. DIVERSITY INDEX: Are we using multiple perspectives?
unique_perspectives = len(set(responses.keys())) if responses else 0
max_perspectives = len(responses) if responses else 1
perspective_diversity = unique_perspectives / max(max_perspectives, 1)
# 3. RESOLUTION RATE: Did we make progress this round?
resolution_rate = 0.5 # Default; updated externally if conflict evolution available
if conflicts:
resolved = sum(1 for c in conflicts if hasattr(c, 'resolution_rate') and c.resolution_rate > 0.4)
resolution_rate = resolved / len(conflicts)
# 4. ADAPTER WEIGHT VARIANCE: Are weights concentrated or distributed?
adapter_weight_variance = 0.0
if adapter_weights:
weights = list(adapter_weights.values())
if len(weights) > 1:
mean_weight = sum(weights) / len(weights)
variance = sum((w - mean_weight) ** 2 for w in weights) / len(weights)
# Normalize variance to [0, 1] where 1 = all weight on one adapter
max_variance = 4.0 # Empirical max for [0, 2.0] weight range
adapter_weight_variance = min(1.0, variance / max_variance)
else:
adapter_weight_variance = 0.5 # Unknown = neutral
# 5. EPISTEMIC TENSION: Is ξ in productive zone?
# ξ = average conflict strength (should be 0.1-0.4 for productive tension)
epistemic_tension = avg_conflict_strength
tension_health = 1.0 - abs(epistemic_tension - 0.25) / 0.15 # Peaked at 0.25
tension_health = max(0.0, min(1.0, tension_health))
# 6. COHERENCE QUALITY: Placeholder (usually from ensemble coherence)
# In integration, this will come from debate metadata
coherence_score = 0.7 # Default; typically overridden by caller
# 7. COMPUTE GAMMA: Composite health score
# γ = w1 * diversity + w2 * tension_health + w3 * (1 - weight_variance) + w4 * resolution_rate
# Weights: equal contribution from each signal
gamma = (
0.25 * perspective_diversity + # More perspectives = healthier
0.25 * tension_health + # Productive tension = healthier
0.25 * (1.0 - adapter_weight_variance) + # Distributed weights = healthier
0.25 * resolution_rate # Making progress = healthier
)
metrics = GammaHealthMetrics(
timestamp=time.time(),
avg_conflict_strength=avg_conflict_strength,
perspective_diversity=perspective_diversity,
resolution_rate=resolution_rate,
adapter_weight_variance=adapter_weight_variance,
epistemic_tension=epistemic_tension,
coherence_score=coherence_score,
gamma=gamma,
)
self.health_history.append(metrics)
return metrics
def get_intervention(self, metrics: GammaHealthMetrics,
available_adapters: List[str]) -> Optional[GammaIntervention]:
"""Determine if system needs stabilization intervention.
Args:
metrics: Current GammaHealthMetrics
available_adapters: List of adapter names available
Returns:
GammaIntervention if action needed, else None
"""
if metrics.is_stable():
return None # Healthy zone — maintain
intervention = None
if metrics.is_collapsing():
# γ < 0.4: System instability detected
# Likely causes: weight drift, low diversity, unresolved conflicts
# Fix: Inject a diverse perspective that hasn't been used recently
unused_adapters = [a for a in available_adapters
if self.memory_weighting is None or
a not in self.memory_weighting.adapter_weights or
self.memory_weighting.adapter_weights[a].interaction_count == 0]
if not unused_adapters:
# All adapters have been used; pick lowest-weight one
if self.memory_weighting and self.memory_weighting.adapter_weights:
unused_adapters = [min(self.memory_weighting.adapter_weights.items(),
key=lambda x: x[1].weight)[0]]
else:
unused_adapters = [available_adapters[0]]
intervention = GammaIntervention(
timestamp=time.time(),
intervention_type=InterventionType.DIVERSITY_INJECTION,
reason=f"System instability detected (γ={metrics.gamma:.2f} < 0.4). "
f"Diversity={metrics.perspective_diversity:.1%}, "
f"Weight variance={metrics.adapter_weight_variance:.1%}. "
f"Injecting diverse perspective to break monoculture.",
gamma_before=metrics.gamma,
recommended_adapter=unused_adapters[0],
)
elif metrics.is_groupthinking():
# γ > 0.8: Groupthink risk
# Too much agreement; system may have converged on wrong answer
# Fix: Force a conflict pair to create productive tension
# Select two adapters with highest complementary potential
if available_adapters and len(available_adapters) >= 2:
# Pick the two most different adapters (by weight or type)
sorted_adapters = sorted(available_adapters)
pair = (sorted_adapters[0], sorted_adapters[-1]) # First and last alphabetically
intervention = GammaIntervention(
timestamp=time.time(),
intervention_type=InterventionType.CONFLICT_INJECTION,
reason=f"Groupthink risk detected (γ={metrics.gamma:.2f} > 0.8). "
f"Low conflict={metrics.epistemic_tension:.2f}, "
f"High diversity={metrics.perspective_diversity:.1%}. "
f"Forcing debate pair to create productive tension.",
gamma_before=metrics.gamma,
recommended_adapter=f"{pair[0]};{pair[1]}", # Semicolon denotes pair
)
if intervention:
self.interventions.append(intervention)
return intervention
def get_summary(self) -> Dict:
"""Return summary of system health trends (API-consistent name)."""
if not self.health_history:
return {}
# Convert deque to list to enable slicing
history_list = list(self.health_history)
interventions_list = list(self.interventions)
recent = history_list[-10:] # Last 10 snapshots
gammas = [m.gamma for m in recent]
tensions = [m.epistemic_tension for m in recent]
diversities = [m.perspective_diversity for m in recent]
return {
"current_gamma": recent[-1].gamma if recent else 0.0,
"avg_gamma": sum(gammas) / len(gammas),
"gamma_trend": "stable" if len(gammas) < 2 else (
"improving" if gammas[-1] > gammas[0] else "degrading"
),
"avg_tension": sum(tensions) / len(tensions),
"avg_diversity": sum(diversities) / len(diversities),
"interventions_total": len(interventions_list),
"interventions_recent": sum(1 for i in interventions_list
if time.time() - i.timestamp < 3600), # Last hour
"status": (
"collapsing" if recent[-1].is_collapsing() else
"groupthinking" if recent[-1].is_groupthinking() else
"stable"
),
}
def export_metrics(self) -> Dict:
"""Export all health metrics for monitoring/logging."""
# Convert deques to lists for serialization (deques can't be directly converted to JSON-safe dicts)
health_list = list(self.health_history)
interventions_list = list(self.interventions)
return {
"health_history": [
{
"timestamp": m.timestamp,
"gamma": m.gamma,
"conflict": m.avg_conflict_strength,
"diversity": m.perspective_diversity,
"resolution": m.resolution_rate,
"weight_variance": m.adapter_weight_variance,
}
for m in health_list[-50:] # Last 50 samples
],
"interventions": [
{
"timestamp": i.timestamp,
"type": i.intervention_type.value, # Convert Enum to string for JSON
"reason": i.reason,
"gamma_before": i.gamma_before,
"recommended": i.recommended_adapter,
"result": i.result,
}
for i in interventions_list[-20:] # Last 20 interventions
],
"summary": self.get_summary(),
}