""" Phase 6: Specialization Tracker Monitors adapter specialization and prevents semantic convergence. Key metrics: - specialization_score = domain_accuracy / usage_frequency (higher = expert in domain, not overtaxed) - semantic_convergence = similarity between adapter outputs (alert if > 0.85, indicates monoculture within adapters) Prevents: - Weight drift (Phase 5 catches at system level) - Semantic convergence (adapters giving similar answers, Phase 6 catches) """ from typing import List, Dict, Optional import numpy as np from datetime import datetime class SpecializationTracker: """ Tracks per-adapter per-domain performance to maintain specialization and detect when adapters are overlapping semantically. """ # Domain keywords for query classification DOMAIN_KEYWORDS = { "physics": ["force", "momentum", "gravity", "quantum", "relativity", "acceleration", "Newton", "energy"], "ethics": ["should", "right", "wrong", "moral", "ethics", "justice", "fair", "values", "good"], "consciousness": ["aware", "conscious", "mind", "self", "experience", "perception", "qualia", "sentient"], "creativity": ["design", "create", "novel", "innovative", "imagine", "artistic", "original", "aesthetic"], "systems": ["system", "architecture", "scalable", "complex", "interdependent", "emergence", "network"], "philosophy": ["meaning", "existence", "truth", "knowledge", "being", "essence", "reasoning"], } def __init__(self): """Initialize tracking dictionaries.""" self.domain_accuracy = {} # {adapter: {domain: [coherence_scores]}} self.domain_usage = {} # {adapter: {domain: count}} self.domain_last_used = {} # {adapter: {domain: timestamp}} self.query_domains = {} # {query_id: [domain_tags]} self.semantic_convergence_history = [] # Track convergence over time def classify_query_domain(self, query: str) -> List[str]: """ Classify query by topic domain using keyword heuristics. Returns: List of domain tags, e.g., ["physics", "ethics"] for multi-domain queries. Returns ["general"] if no keywords match. """ domains = [] query_lower = query.lower() for domain, keywords in self.DOMAIN_KEYWORDS.items(): if any(k.lower() in query_lower for k in keywords): domains.append(domain) return domains if domains else ["general"] def record_adapter_performance(self, adapter: str, query: str, coherence: float): """ Log adapter performance in domain(s) for a query. Args: adapter: Adapter name (e.g., "newton", "empathy") query: Query text coherence: Output coherence score [0, 1] """ domains = self.classify_query_domain(query) for domain in domains: # Initialize if needed if adapter not in self.domain_accuracy: self.domain_accuracy[adapter] = {} self.domain_usage[adapter] = {} self.domain_last_used[adapter] = {} if domain not in self.domain_accuracy[adapter]: self.domain_accuracy[adapter][domain] = [] self.domain_usage[adapter][domain] = 0 self.domain_last_used[adapter][domain] = None # Record coherence and increment usage self.domain_accuracy[adapter][domain].append(coherence) self.domain_usage[adapter][domain] += 1 self.domain_last_used[adapter][domain] = datetime.now() def compute_specialization(self, adapter: str) -> Dict[str, float]: """ Compute specialization_score for each domain an adapter is used in. specialization_score[domain] = mean_accuracy[domain] / usage_frequency[domain] Returns: {domain: specialization_score} for all domains used Higher = more specialized (good performance, not overused) """ if adapter not in self.domain_accuracy: return {} specialization = {} for domain in self.domain_accuracy[adapter]: accuracies = self.domain_accuracy[adapter][domain] usage = self.domain_usage[adapter][domain] mean_accuracy = float(np.mean(accuracies)) if accuracies else 0.5 # Avoid division by zero, natural penalty for high usage specialization[domain] = mean_accuracy / max(usage, 1) return specialization def get_global_specialization(self) -> Dict[str, Dict[str, float]]: """ Compute specialization scores for all adapters. Returns: {adapter: {domain: specialization_score}} """ return {adapter: self.compute_specialization(adapter) for adapter in self.domain_accuracy.keys()} def detect_domain_expert(self, domain: str) -> Optional[str]: """ Find best-performing adapter for a specific domain. Returns: Adapter name with highest specialization in domain, or None """ specs = self.get_global_specialization() experts = {a: s.get(domain, 0) for a, s in specs.items() if domain in s} if not experts: return None return max(experts.keys(), key=lambda a: experts[a]) def detect_semantic_convergence( self, adapter_outputs: Dict[str, str], semantic_engine=None, threshold: float = 0.85 ) -> Dict: """ Measure overlap between adapter outputs on same query. Alerts if any pair similarity > threshold (converging). Args: adapter_outputs: {adapter_name: output_text} semantic_engine: SemanticTensionEngine instance (optional, for real embeddings) threshold: Similarity threshold for convergence alert Returns: { "convergent_pairs": [{pair, similarity, risk}], "max_similarity": float, "has_convergence": bool, } """ if len(adapter_outputs) < 2: return {"convergent_pairs": [], "max_similarity": 0.0, "has_convergence": False} convergent_pairs = [] max_similarity = 0.0 adapters = list(adapter_outputs.keys()) for i, a1 in enumerate(adapters): for a2 in adapters[i + 1 :]: output_a = adapter_outputs[a1] output_b = adapter_outputs[a2] # Compute similarity (use semantic engine if available) if semantic_engine: try: tension = semantic_engine.compute_semantic_tension(output_a, output_b) similarity = 1.0 - tension except Exception: # Fallback to text overlap similarity = self._text_similarity(output_a, output_b) else: # Simple fallback: token overlap similarity = self._text_similarity(output_a, output_b) max_similarity = max(max_similarity, similarity) if similarity > threshold: convergent_pairs.append({ "adapter_a": a1, "adapter_b": a2, "similarity": round(similarity, 3), "convergence_risk": "HIGH" if similarity > 0.92 else "MEDIUM", }) has_convergence = len(convergent_pairs) > 0 record = { "timestamp": datetime.now().isoformat(), "convergent_pairs": convergent_pairs, "max_similarity": round(max_similarity, 3), "has_convergence": has_convergence, "num_adapters": len(adapter_outputs), } self.semantic_convergence_history.append(record) return record def _text_similarity(self, text_a: str, text_b: str) -> float: """ Simple text similarity fallback: Jaccard similarity on tokens. Args: text_a, text_b: Text strings Returns: Similarity in [0, 1] """ tokens_a = set(text_a.lower().split()) tokens_b = set(text_b.lower().split()) if not tokens_a or not tokens_b: return 0.0 intersection = len(tokens_a & tokens_b) union = len(tokens_a | tokens_b) return intersection / max(union, 1) def get_adapter_health(self, adapter: str) -> Dict: """ Get overall health score for an adapter. Returns: { "adapter": adapter, "num_domains": int, "avg_accuracy": float, "total_usage": int, "specialization_avg": float, "recommendation": str } """ if adapter not in self.domain_accuracy: return {"error": f"No data for adapter {adapter}"} accuracies_all = [] usage_total = 0 for domain in self.domain_accuracy[adapter]: accuracies_all.extend(self.domain_accuracy[adapter][domain]) usage_total += self.domain_usage[adapter][domain] avg_accuracy = float(np.mean(accuracies_all)) if accuracies_all else 0.5 specs = self.compute_specialization(adapter) spec_avg = float(np.mean(list(specs.values()))) if specs else 0.5 # Generate recommendation if spec_avg > 0.1 and avg_accuracy > 0.75: recommendation = "excellent_specialist" elif spec_avg > 0.05 and avg_accuracy > 0.6: recommendation = "good_generalist" elif usage_total > 20 and avg_accuracy < 0.5: recommendation = "overused_poorly" else: recommendation = "maintain_current" return { "adapter": adapter, "num_domains": len(self.domain_accuracy[adapter]), "avg_accuracy": round(avg_accuracy, 3), "total_usage": usage_total, "specialization_avg": round(spec_avg, 3), "recommendation": recommendation, "domain_specializations": {d: round(s, 3) for d, s in specs.items()}, } def get_system_health(self) -> Dict: """ Get overall system specialization health. Returns: Flags convergence risks, identifies experts, recommends actions. """ health_by_adapter = {adapter: self.get_adapter_health(adapter) for adapter in self.domain_accuracy.keys()} overused = [a for a, h in health_by_adapter.items() if h.get("recommendation") == "overused_poorly"] excellent = [a for a, h in health_by_adapter.items() if h.get("recommendation") == "excellent_specialist"] experts = {domain: self.detect_domain_expert(domain) for domain in self.DOMAIN_KEYWORDS.keys()} return { "timestamp": datetime.now().isoformat(), "total_adapters": len(health_by_adapter), "health_by_adapter": health_by_adapter, "overused_adapters": overused, "specialist_adapters": excellent, "domain_experts": experts, "convergence_alerts": self.semantic_convergence_history[-5:] if self.semantic_convergence_history else [], } def export_summary(self) -> Dict: """Export complete specialization data for analysis.""" return { "timestamp": datetime.now().isoformat(), "global_specialization": self.get_global_specialization(), "system_health": self.get_system_health(), "convergence_history": self.semantic_convergence_history, } __all__ = ["SpecializationTracker"]