File size: 11,786 Bytes
ed1b365
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
"""
Phase 6: Specialization Tracker

Monitors adapter specialization and prevents semantic convergence.

Key metrics:
- specialization_score = domain_accuracy / usage_frequency
  (higher = expert in domain, not overtaxed)
- semantic_convergence = similarity between adapter outputs
  (alert if > 0.85, indicates monoculture within adapters)

Prevents:
- Weight drift (Phase 5 catches at system level)
- Semantic convergence (adapters giving similar answers, Phase 6 catches)
"""

from typing import List, Dict, Optional
import numpy as np
from datetime import datetime


class SpecializationTracker:
    """
    Tracks per-adapter per-domain performance to maintain specialization
    and detect when adapters are overlapping semantically.
    """

    # Domain keywords for query classification
    DOMAIN_KEYWORDS = {
        "physics": ["force", "momentum", "gravity", "quantum", "relativity", "acceleration", "Newton", "energy"],
        "ethics": ["should", "right", "wrong", "moral", "ethics", "justice", "fair", "values", "good"],
        "consciousness": ["aware", "conscious", "mind", "self", "experience", "perception", "qualia", "sentient"],
        "creativity": ["design", "create", "novel", "innovative", "imagine", "artistic", "original", "aesthetic"],
        "systems": ["system", "architecture", "scalable", "complex", "interdependent", "emergence", "network"],
        "philosophy": ["meaning", "existence", "truth", "knowledge", "being", "essence", "reasoning"],
    }

    def __init__(self):
        """Initialize tracking dictionaries."""
        self.domain_accuracy = {}      # {adapter: {domain: [coherence_scores]}}
        self.domain_usage = {}         # {adapter: {domain: count}}
        self.domain_last_used = {}     # {adapter: {domain: timestamp}}
        self.query_domains = {}        # {query_id: [domain_tags]}
        self.semantic_convergence_history = []  # Track convergence over time

    def classify_query_domain(self, query: str) -> List[str]:
        """
        Classify query by topic domain using keyword heuristics.

        Returns:
            List of domain tags, e.g., ["physics", "ethics"] for multi-domain queries.
            Returns ["general"] if no keywords match.
        """
        domains = []
        query_lower = query.lower()

        for domain, keywords in self.DOMAIN_KEYWORDS.items():
            if any(k.lower() in query_lower for k in keywords):
                domains.append(domain)

        return domains if domains else ["general"]

    def record_adapter_performance(self, adapter: str, query: str, coherence: float):
        """
        Log adapter performance in domain(s) for a query.

        Args:
            adapter: Adapter name (e.g., "newton", "empathy")
            query: Query text
            coherence: Output coherence score [0, 1]
        """
        domains = self.classify_query_domain(query)

        for domain in domains:
            # Initialize if needed
            if adapter not in self.domain_accuracy:
                self.domain_accuracy[adapter] = {}
                self.domain_usage[adapter] = {}
                self.domain_last_used[adapter] = {}

            if domain not in self.domain_accuracy[adapter]:
                self.domain_accuracy[adapter][domain] = []
                self.domain_usage[adapter][domain] = 0
                self.domain_last_used[adapter][domain] = None

            # Record coherence and increment usage
            self.domain_accuracy[adapter][domain].append(coherence)
            self.domain_usage[adapter][domain] += 1
            self.domain_last_used[adapter][domain] = datetime.now()

    def compute_specialization(self, adapter: str) -> Dict[str, float]:
        """
        Compute specialization_score for each domain an adapter is used in.

        specialization_score[domain] = mean_accuracy[domain] / usage_frequency[domain]

        Returns:
            {domain: specialization_score} for all domains used
            Higher = more specialized (good performance, not overused)
        """
        if adapter not in self.domain_accuracy:
            return {}

        specialization = {}

        for domain in self.domain_accuracy[adapter]:
            accuracies = self.domain_accuracy[adapter][domain]
            usage = self.domain_usage[adapter][domain]

            mean_accuracy = float(np.mean(accuracies)) if accuracies else 0.5
            # Avoid division by zero, natural penalty for high usage
            specialization[domain] = mean_accuracy / max(usage, 1)

        return specialization

    def get_global_specialization(self) -> Dict[str, Dict[str, float]]:
        """
        Compute specialization scores for all adapters.

        Returns:
            {adapter: {domain: specialization_score}}
        """
        return {adapter: self.compute_specialization(adapter) for adapter in self.domain_accuracy.keys()}

    def detect_domain_expert(self, domain: str) -> Optional[str]:
        """
        Find best-performing adapter for a specific domain.

        Returns:
            Adapter name with highest specialization in domain, or None
        """
        specs = self.get_global_specialization()
        experts = {a: s.get(domain, 0) for a, s in specs.items() if domain in s}

        if not experts:
            return None

        return max(experts.keys(), key=lambda a: experts[a])

    def detect_semantic_convergence(
        self, adapter_outputs: Dict[str, str], semantic_engine=None, threshold: float = 0.85
    ) -> Dict:
        """
        Measure overlap between adapter outputs on same query.

        Alerts if any pair similarity > threshold (converging).

        Args:
            adapter_outputs: {adapter_name: output_text}
            semantic_engine: SemanticTensionEngine instance (optional, for real embeddings)
            threshold: Similarity threshold for convergence alert

        Returns:
            {
                "convergent_pairs": [{pair, similarity, risk}],
                "max_similarity": float,
                "has_convergence": bool,
            }
        """
        if len(adapter_outputs) < 2:
            return {"convergent_pairs": [], "max_similarity": 0.0, "has_convergence": False}

        convergent_pairs = []
        max_similarity = 0.0

        adapters = list(adapter_outputs.keys())

        for i, a1 in enumerate(adapters):
            for a2 in adapters[i + 1 :]:
                output_a = adapter_outputs[a1]
                output_b = adapter_outputs[a2]

                # Compute similarity (use semantic engine if available)
                if semantic_engine:
                    try:
                        tension = semantic_engine.compute_semantic_tension(output_a, output_b)
                        similarity = 1.0 - tension
                    except Exception:
                        # Fallback to text overlap
                        similarity = self._text_similarity(output_a, output_b)
                else:
                    # Simple fallback: token overlap
                    similarity = self._text_similarity(output_a, output_b)

                max_similarity = max(max_similarity, similarity)

                if similarity > threshold:
                    convergent_pairs.append({
                        "adapter_a": a1,
                        "adapter_b": a2,
                        "similarity": round(similarity, 3),
                        "convergence_risk": "HIGH" if similarity > 0.92 else "MEDIUM",
                    })

        has_convergence = len(convergent_pairs) > 0

        record = {
            "timestamp": datetime.now().isoformat(),
            "convergent_pairs": convergent_pairs,
            "max_similarity": round(max_similarity, 3),
            "has_convergence": has_convergence,
            "num_adapters": len(adapter_outputs),
        }

        self.semantic_convergence_history.append(record)

        return record

    def _text_similarity(self, text_a: str, text_b: str) -> float:
        """
        Simple text similarity fallback: Jaccard similarity on tokens.

        Args:
            text_a, text_b: Text strings

        Returns:
            Similarity in [0, 1]
        """
        tokens_a = set(text_a.lower().split())
        tokens_b = set(text_b.lower().split())

        if not tokens_a or not tokens_b:
            return 0.0

        intersection = len(tokens_a & tokens_b)
        union = len(tokens_a | tokens_b)

        return intersection / max(union, 1)

    def get_adapter_health(self, adapter: str) -> Dict:
        """
        Get overall health score for an adapter.

        Returns:
            {
                "adapter": adapter,
                "num_domains": int,
                "avg_accuracy": float,
                "total_usage": int,
                "specialization_avg": float,
                "recommendation": str
            }
        """
        if adapter not in self.domain_accuracy:
            return {"error": f"No data for adapter {adapter}"}

        accuracies_all = []
        usage_total = 0

        for domain in self.domain_accuracy[adapter]:
            accuracies_all.extend(self.domain_accuracy[adapter][domain])
            usage_total += self.domain_usage[adapter][domain]

        avg_accuracy = float(np.mean(accuracies_all)) if accuracies_all else 0.5
        specs = self.compute_specialization(adapter)
        spec_avg = float(np.mean(list(specs.values()))) if specs else 0.5

        # Generate recommendation
        if spec_avg > 0.1 and avg_accuracy > 0.75:
            recommendation = "excellent_specialist"
        elif spec_avg > 0.05 and avg_accuracy > 0.6:
            recommendation = "good_generalist"
        elif usage_total > 20 and avg_accuracy < 0.5:
            recommendation = "overused_poorly"
        else:
            recommendation = "maintain_current"

        return {
            "adapter": adapter,
            "num_domains": len(self.domain_accuracy[adapter]),
            "avg_accuracy": round(avg_accuracy, 3),
            "total_usage": usage_total,
            "specialization_avg": round(spec_avg, 3),
            "recommendation": recommendation,
            "domain_specializations": {d: round(s, 3) for d, s in specs.items()},
        }

    def get_system_health(self) -> Dict:
        """
        Get overall system specialization health.

        Returns:
            Flags convergence risks, identifies experts, recommends actions.
        """
        health_by_adapter = {adapter: self.get_adapter_health(adapter) for adapter in self.domain_accuracy.keys()}

        overused = [a for a, h in health_by_adapter.items() if h.get("recommendation") == "overused_poorly"]
        excellent = [a for a, h in health_by_adapter.items() if h.get("recommendation") == "excellent_specialist"]
        experts = {domain: self.detect_domain_expert(domain) for domain in self.DOMAIN_KEYWORDS.keys()}

        return {
            "timestamp": datetime.now().isoformat(),
            "total_adapters": len(health_by_adapter),
            "health_by_adapter": health_by_adapter,
            "overused_adapters": overused,
            "specialist_adapters": excellent,
            "domain_experts": experts,
            "convergence_alerts": self.semantic_convergence_history[-5:] if self.semantic_convergence_history else [],
        }

    def export_summary(self) -> Dict:
        """Export complete specialization data for analysis."""
        return {
            "timestamp": datetime.now().isoformat(),
            "global_specialization": self.get_global_specialization(),
            "system_health": self.get_system_health(),
            "convergence_history": self.semantic_convergence_history,
        }


__all__ = ["SpecializationTracker"]