| """ |
| Phase 6: Specialization Tracker |
| |
| Monitors adapter specialization and prevents semantic convergence. |
| |
| Key metrics: |
| - specialization_score = domain_accuracy / usage_frequency |
| (higher = expert in domain, not overtaxed) |
| - semantic_convergence = similarity between adapter outputs |
| (alert if > 0.85, indicates monoculture within adapters) |
| |
| Prevents: |
| - Weight drift (Phase 5 catches at system level) |
| - Semantic convergence (adapters giving similar answers, Phase 6 catches) |
| """ |
|
|
| from typing import List, Dict, Optional |
| import numpy as np |
| from datetime import datetime |
|
|
|
|
| class SpecializationTracker: |
| """ |
| Tracks per-adapter per-domain performance to maintain specialization |
| and detect when adapters are overlapping semantically. |
| """ |
|
|
| |
| DOMAIN_KEYWORDS = { |
| "physics": ["force", "momentum", "gravity", "quantum", "relativity", "acceleration", "Newton", "energy"], |
| "ethics": ["should", "right", "wrong", "moral", "ethics", "justice", "fair", "values", "good"], |
| "consciousness": ["aware", "conscious", "mind", "self", "experience", "perception", "qualia", "sentient"], |
| "creativity": ["design", "create", "novel", "innovative", "imagine", "artistic", "original", "aesthetic"], |
| "systems": ["system", "architecture", "scalable", "complex", "interdependent", "emergence", "network"], |
| "philosophy": ["meaning", "existence", "truth", "knowledge", "being", "essence", "reasoning"], |
| } |
|
|
| def __init__(self): |
| """Initialize tracking dictionaries.""" |
| self.domain_accuracy = {} |
| self.domain_usage = {} |
| self.domain_last_used = {} |
| self.query_domains = {} |
| self.semantic_convergence_history = [] |
|
|
| def classify_query_domain(self, query: str) -> List[str]: |
| """ |
| Classify query by topic domain using keyword heuristics. |
| |
| Returns: |
| List of domain tags, e.g., ["physics", "ethics"] for multi-domain queries. |
| Returns ["general"] if no keywords match. |
| """ |
| domains = [] |
| query_lower = query.lower() |
|
|
| for domain, keywords in self.DOMAIN_KEYWORDS.items(): |
| if any(k.lower() in query_lower for k in keywords): |
| domains.append(domain) |
|
|
| return domains if domains else ["general"] |
|
|
| def record_adapter_performance(self, adapter: str, query: str, coherence: float): |
| """ |
| Log adapter performance in domain(s) for a query. |
| |
| Args: |
| adapter: Adapter name (e.g., "newton", "empathy") |
| query: Query text |
| coherence: Output coherence score [0, 1] |
| """ |
| domains = self.classify_query_domain(query) |
|
|
| for domain in domains: |
| |
| if adapter not in self.domain_accuracy: |
| self.domain_accuracy[adapter] = {} |
| self.domain_usage[adapter] = {} |
| self.domain_last_used[adapter] = {} |
|
|
| if domain not in self.domain_accuracy[adapter]: |
| self.domain_accuracy[adapter][domain] = [] |
| self.domain_usage[adapter][domain] = 0 |
| self.domain_last_used[adapter][domain] = None |
|
|
| |
| self.domain_accuracy[adapter][domain].append(coherence) |
| self.domain_usage[adapter][domain] += 1 |
| self.domain_last_used[adapter][domain] = datetime.now() |
|
|
| def compute_specialization(self, adapter: str) -> Dict[str, float]: |
| """ |
| Compute specialization_score for each domain an adapter is used in. |
| |
| specialization_score[domain] = mean_accuracy[domain] / usage_frequency[domain] |
| |
| Returns: |
| {domain: specialization_score} for all domains used |
| Higher = more specialized (good performance, not overused) |
| """ |
| if adapter not in self.domain_accuracy: |
| return {} |
|
|
| specialization = {} |
|
|
| for domain in self.domain_accuracy[adapter]: |
| accuracies = self.domain_accuracy[adapter][domain] |
| usage = self.domain_usage[adapter][domain] |
|
|
| mean_accuracy = float(np.mean(accuracies)) if accuracies else 0.5 |
| |
| specialization[domain] = mean_accuracy / max(usage, 1) |
|
|
| return specialization |
|
|
| def get_global_specialization(self) -> Dict[str, Dict[str, float]]: |
| """ |
| Compute specialization scores for all adapters. |
| |
| Returns: |
| {adapter: {domain: specialization_score}} |
| """ |
| return {adapter: self.compute_specialization(adapter) for adapter in self.domain_accuracy.keys()} |
|
|
| def detect_domain_expert(self, domain: str) -> Optional[str]: |
| """ |
| Find best-performing adapter for a specific domain. |
| |
| Returns: |
| Adapter name with highest specialization in domain, or None |
| """ |
| specs = self.get_global_specialization() |
| experts = {a: s.get(domain, 0) for a, s in specs.items() if domain in s} |
|
|
| if not experts: |
| return None |
|
|
| return max(experts.keys(), key=lambda a: experts[a]) |
|
|
| def detect_semantic_convergence( |
| self, adapter_outputs: Dict[str, str], semantic_engine=None, threshold: float = 0.85 |
| ) -> Dict: |
| """ |
| Measure overlap between adapter outputs on same query. |
| |
| Alerts if any pair similarity > threshold (converging). |
| |
| Args: |
| adapter_outputs: {adapter_name: output_text} |
| semantic_engine: SemanticTensionEngine instance (optional, for real embeddings) |
| threshold: Similarity threshold for convergence alert |
| |
| Returns: |
| { |
| "convergent_pairs": [{pair, similarity, risk}], |
| "max_similarity": float, |
| "has_convergence": bool, |
| } |
| """ |
| if len(adapter_outputs) < 2: |
| return {"convergent_pairs": [], "max_similarity": 0.0, "has_convergence": False} |
|
|
| convergent_pairs = [] |
| max_similarity = 0.0 |
|
|
| adapters = list(adapter_outputs.keys()) |
|
|
| for i, a1 in enumerate(adapters): |
| for a2 in adapters[i + 1 :]: |
| output_a = adapter_outputs[a1] |
| output_b = adapter_outputs[a2] |
|
|
| |
| if semantic_engine: |
| try: |
| tension = semantic_engine.compute_semantic_tension(output_a, output_b) |
| similarity = 1.0 - tension |
| except Exception: |
| |
| similarity = self._text_similarity(output_a, output_b) |
| else: |
| |
| similarity = self._text_similarity(output_a, output_b) |
|
|
| max_similarity = max(max_similarity, similarity) |
|
|
| if similarity > threshold: |
| convergent_pairs.append({ |
| "adapter_a": a1, |
| "adapter_b": a2, |
| "similarity": round(similarity, 3), |
| "convergence_risk": "HIGH" if similarity > 0.92 else "MEDIUM", |
| }) |
|
|
| has_convergence = len(convergent_pairs) > 0 |
|
|
| record = { |
| "timestamp": datetime.now().isoformat(), |
| "convergent_pairs": convergent_pairs, |
| "max_similarity": round(max_similarity, 3), |
| "has_convergence": has_convergence, |
| "num_adapters": len(adapter_outputs), |
| } |
|
|
| self.semantic_convergence_history.append(record) |
|
|
| return record |
|
|
| def _text_similarity(self, text_a: str, text_b: str) -> float: |
| """ |
| Simple text similarity fallback: Jaccard similarity on tokens. |
| |
| Args: |
| text_a, text_b: Text strings |
| |
| Returns: |
| Similarity in [0, 1] |
| """ |
| tokens_a = set(text_a.lower().split()) |
| tokens_b = set(text_b.lower().split()) |
|
|
| if not tokens_a or not tokens_b: |
| return 0.0 |
|
|
| intersection = len(tokens_a & tokens_b) |
| union = len(tokens_a | tokens_b) |
|
|
| return intersection / max(union, 1) |
|
|
| def get_adapter_health(self, adapter: str) -> Dict: |
| """ |
| Get overall health score for an adapter. |
| |
| Returns: |
| { |
| "adapter": adapter, |
| "num_domains": int, |
| "avg_accuracy": float, |
| "total_usage": int, |
| "specialization_avg": float, |
| "recommendation": str |
| } |
| """ |
| if adapter not in self.domain_accuracy: |
| return {"error": f"No data for adapter {adapter}"} |
|
|
| accuracies_all = [] |
| usage_total = 0 |
|
|
| for domain in self.domain_accuracy[adapter]: |
| accuracies_all.extend(self.domain_accuracy[adapter][domain]) |
| usage_total += self.domain_usage[adapter][domain] |
|
|
| avg_accuracy = float(np.mean(accuracies_all)) if accuracies_all else 0.5 |
| specs = self.compute_specialization(adapter) |
| spec_avg = float(np.mean(list(specs.values()))) if specs else 0.5 |
|
|
| |
| if spec_avg > 0.1 and avg_accuracy > 0.75: |
| recommendation = "excellent_specialist" |
| elif spec_avg > 0.05 and avg_accuracy > 0.6: |
| recommendation = "good_generalist" |
| elif usage_total > 20 and avg_accuracy < 0.5: |
| recommendation = "overused_poorly" |
| else: |
| recommendation = "maintain_current" |
|
|
| return { |
| "adapter": adapter, |
| "num_domains": len(self.domain_accuracy[adapter]), |
| "avg_accuracy": round(avg_accuracy, 3), |
| "total_usage": usage_total, |
| "specialization_avg": round(spec_avg, 3), |
| "recommendation": recommendation, |
| "domain_specializations": {d: round(s, 3) for d, s in specs.items()}, |
| } |
|
|
| def get_system_health(self) -> Dict: |
| """ |
| Get overall system specialization health. |
| |
| Returns: |
| Flags convergence risks, identifies experts, recommends actions. |
| """ |
| health_by_adapter = {adapter: self.get_adapter_health(adapter) for adapter in self.domain_accuracy.keys()} |
|
|
| overused = [a for a, h in health_by_adapter.items() if h.get("recommendation") == "overused_poorly"] |
| excellent = [a for a, h in health_by_adapter.items() if h.get("recommendation") == "excellent_specialist"] |
| experts = {domain: self.detect_domain_expert(domain) for domain in self.DOMAIN_KEYWORDS.keys()} |
|
|
| return { |
| "timestamp": datetime.now().isoformat(), |
| "total_adapters": len(health_by_adapter), |
| "health_by_adapter": health_by_adapter, |
| "overused_adapters": overused, |
| "specialist_adapters": excellent, |
| "domain_experts": experts, |
| "convergence_alerts": self.semantic_convergence_history[-5:] if self.semantic_convergence_history else [], |
| } |
|
|
| def export_summary(self) -> Dict: |
| """Export complete specialization data for analysis.""" |
| return { |
| "timestamp": datetime.now().isoformat(), |
| "global_specialization": self.get_global_specialization(), |
| "system_health": self.get_system_health(), |
| "convergence_history": self.semantic_convergence_history, |
| } |
|
|
|
|
| __all__ = ["SpecializationTracker"] |
|
|