File size: 18,350 Bytes
24f95f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
"""
Core learning engine that coordinates all learning activities.

Orchestrates knowledge ingestion, experience learning, prompt evolution,
skill distillation, trust management, and freshness management.
"""

import logging
from typing import Dict, Any, List, Optional
from datetime import datetime
from collections import Counter

from .knowledge_ingestor import KnowledgeIngestor
from .knowledge_store import KnowledgeStore
from .prompt_optimizer import PromptOptimizer
from .skill_distiller import SkillDistiller
from .trust_manager import TrustManager

logger = logging.getLogger(__name__)


class LearningEngine:
    """Coordinates all learning activities."""

    def __init__(
        self,
        knowledge_store: KnowledgeStore,
        knowledge_ingestor: KnowledgeIngestor,
        prompt_optimizer: Optional[PromptOptimizer] = None,
        skill_distiller: Optional[SkillDistiller] = None,
        trust_manager: Optional[TrustManager] = None,
    ):
        self.knowledge_store = knowledge_store
        self.knowledge_ingestor = knowledge_ingestor
        self.prompt_optimizer = prompt_optimizer
        self.skill_distiller = skill_distiller
        self.trust_manager = trust_manager
        self.last_run: Dict[str, str] = {}

        # In-memory learning metadata (flushed periodically)
        self._case_learnings: List[Dict[str, Any]] = []
        self._route_stats: Counter = Counter()
        self._provider_stats: Dict[str, Dict[str, int]] = {}
        self._prompt_performance: Dict[str, Dict[str, Any]] = {}

    # ── Knowledge Ingestion ──────────────────────────────────────────────────

    async def run_knowledge_ingestion(self, topics: list[str]) -> Dict[str, Any]:
        """
        Run knowledge ingestion for specified topics.

        Args:
            topics: List of topics to ingest knowledge about

        Returns:
            Ingestion results
        """
        logger.info(f"Running knowledge ingestion for topics: {topics}")

        total_items = 0
        results = []

        for topic in topics:
            search_items = await self.knowledge_ingestor.ingest_from_search(topic)
            for item in search_items:
                self.knowledge_store.save_knowledge(item)
                total_items += 1

            news_items = await self.knowledge_ingestor.ingest_from_news(topic)
            for item in news_items:
                self.knowledge_store.save_knowledge(item)
                total_items += 1

            results.append({
                "topic": topic,
                "search_items": len(search_items),
                "news_items": len(news_items),
            })

        self.last_run["knowledge_ingestion"] = datetime.utcnow().isoformat()

        logger.info(f"Knowledge ingestion complete: {total_items} items ingested")

        return {
            "total_items": total_items,
            "results": results,
            "timestamp": self.last_run["knowledge_ingestion"],
        }

    async def run_cleanup(self, expiration_days: int = 30) -> Dict[str, Any]:
        """Run cleanup of expired knowledge."""
        logger.info("Running knowledge cleanup")

        deleted_count = self.knowledge_store.delete_expired_knowledge(expiration_days)
        self.last_run["cleanup"] = datetime.utcnow().isoformat()

        return {
            "deleted_count": deleted_count,
            "timestamp": self.last_run["cleanup"],
        }

    # ── Experience Learning (Task 34) ────────────────────────────────────────

    def learn_from_case(self, case_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Extract learning metadata from a completed case execution.

        Args:
            case_data: Complete case payload

        Returns:
            Learning metadata extracted
        """
        case_id = case_data.get("case_id", "unknown")
        logger.info(f"Learning from case {case_id}")

        route = case_data.get("route", {})
        outputs = case_data.get("outputs") or self._derive_outputs(case_data)

        # 1. Track route effectiveness
        route_key = f"{route.get('domain_pack', 'general')}:{route.get('execution_mode', 'standard')}"
        self._route_stats[route_key] += 1

        # 2. Track which agents produced useful output
        agents_used = []
        agent_quality = {}
        for output in outputs:
            if isinstance(output, dict):
                agent_name = output.get("agent", "unknown")
                summary = output.get("summary", "")
                confidence = output.get("confidence", 0.0)
                agents_used.append(agent_name)
                agent_quality[agent_name] = {
                    "output_length": len(summary),
                    "confidence": confidence,
                    "produced_output": len(summary) > 10,
                }

        # 3. Build learning record
        learning = {
            "case_id": case_id,
            "route": route,
            "agents_used": agents_used,
            "agent_quality": agent_quality,
            "domain": route.get("domain_pack", "general"),
            "complexity": route.get("complexity", "medium"),
            "execution_mode": route.get("execution_mode", "standard"),
            "learned_at": datetime.utcnow().isoformat(),
        }

        self._case_learnings.append(learning)

        # Keep only last 500 learnings in memory
        if len(self._case_learnings) > 500:
            self._case_learnings = self._case_learnings[-500:]

        # 4. Update trust scores for sources mentioned in research
        if self.trust_manager:
            for output in outputs:
                if isinstance(output, dict) and output.get("agent") == "research":
                    sources = output.get("details", {}).get("sources", [])
                    for source in sources:
                        if isinstance(source, str):
                            self.trust_manager.update_trust(source, True, weight=0.5)

        logger.info(f"Learned from case {case_id}: route={route_key}, agents={agents_used}")
        return learning

    def _derive_outputs(self, case_data: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Backfill agent outputs from the current case shape."""
        outputs: List[Dict[str, Any]] = []

        def _append(agent: str, details: Dict[str, Any]) -> None:
            if not isinstance(details, dict) or not details:
                return
            summary = (
                details.get("summary")
                or details.get("response")
                or details.get("estimated_output")
                or ""
            )
            outputs.append(
                {
                    "agent": agent,
                    "summary": str(summary),
                    "confidence": float(details.get("confidence", 0.0) or 0.0),
                    "details": details,
                }
            )

        _append("research", case_data.get("research", {}))
        _append("planner", case_data.get("planner", {}))
        _append("verifier", case_data.get("verifier", {}))
        _append("synthesizer", case_data.get("final", {}))
        return outputs

    def detect_patterns(self, min_frequency: int = 3) -> List[Dict[str, Any]]:
        """
        Detect patterns in recent case executions.

        Args:
            min_frequency: Minimum occurrences to count as a pattern

        Returns:
            List of detected patterns
        """
        if len(self._case_learnings) < min_frequency:
            return []

        patterns = []

        # Pattern 1: Domain frequency
        domain_counts = Counter(l["domain"] for l in self._case_learnings)
        for domain, count in domain_counts.items():
            if count >= min_frequency:
                patterns.append({
                    "type": "domain_frequency",
                    "domain": domain,
                    "count": count,
                    "percentage": count / len(self._case_learnings) * 100,
                })

        # Pattern 2: Execution mode frequency
        mode_counts = Counter(l["execution_mode"] for l in self._case_learnings)
        for mode, count in mode_counts.items():
            if count >= min_frequency:
                patterns.append({
                    "type": "execution_mode_frequency",
                    "mode": mode,
                    "count": count,
                    "percentage": count / len(self._case_learnings) * 100,
                })

        # Pattern 3: Agent combinations that produce high confidence
        high_confidence_combos = Counter()
        for learning in self._case_learnings:
            quality = learning.get("agent_quality", {})
            high_conf_agents = [
                a for a, q in quality.items()
                if q.get("confidence", 0) > 0.7
            ]
            if high_conf_agents:
                high_confidence_combos[tuple(sorted(high_conf_agents))] += 1

        for combo, count in high_confidence_combos.items():
            if count >= min_frequency:
                patterns.append({
                    "type": "high_confidence_agents",
                    "agents": list(combo),
                    "count": count,
                })

        self.last_run["pattern_detection"] = datetime.utcnow().isoformat()
        logger.info(f"Detected {len(patterns)} patterns from {len(self._case_learnings)} cases")
        return patterns

    def get_route_effectiveness(self) -> Dict[str, Any]:
        """Get route effectiveness insights."""
        total = sum(self._route_stats.values())
        if total == 0:
            return {"total_cases": 0, "routes": {}}

        return {
            "total_cases": total,
            "routes": {
                route: {
                    "count": count,
                    "percentage": round(count / total * 100, 1),
                }
                for route, count in self._route_stats.most_common()
            },
        }

    def get_prompt_performance(self) -> Dict[str, Any]:
        """Get prompt performance insights from case learnings."""
        if not self._case_learnings:
            return {"total_cases": 0, "agents": {}}

        agent_stats: Dict[str, Dict[str, Any]] = {}
        for learning in self._case_learnings:
            for agent, quality in learning.get("agent_quality", {}).items():
                if agent not in agent_stats:
                    agent_stats[agent] = {
                        "total_runs": 0,
                        "total_confidence": 0.0,
                        "produced_output_count": 0,
                    }
                agent_stats[agent]["total_runs"] += 1
                agent_stats[agent]["total_confidence"] += quality.get("confidence", 0)
                if quality.get("produced_output", False):
                    agent_stats[agent]["produced_output_count"] += 1

        # Calculate averages
        for agent, stats in agent_stats.items():
            runs = stats["total_runs"]
            stats["avg_confidence"] = round(stats["total_confidence"] / runs, 3) if runs > 0 else 0
            stats["output_rate"] = round(stats["produced_output_count"] / runs, 3) if runs > 0 else 0

        return {
            "total_cases": len(self._case_learnings),
            "agents": agent_stats,
        }

    # ── Prompt Evolution (Task 35) ───────────────────────────────────────────

    async def run_prompt_optimization(self, prompt_names: List[str]) -> Dict[str, Any]:
        """
        Run prompt optimization for specified prompts.

        Uses prompt_optimizer to create improved variants based on
        prompt performance data.
        """
        if not self.prompt_optimizer:
            return {"status": "skipped", "reason": "prompt_optimizer not configured"}

        results = []
        performance = self.get_prompt_performance()

        for name in prompt_names:
            agent_perf = performance.get("agents", {}).get(name, {})
            avg_conf = agent_perf.get("avg_confidence", 0.5)

            # Only optimize prompts with low average confidence
            if avg_conf > 0.8:
                results.append({"prompt": name, "status": "skipped", "reason": "already high performance"})
                continue

            try:
                from app.services.prompt_store import get_prompt
                prompt_data = get_prompt(name)
                if not prompt_data:
                    continue

                goal = f"Improve output quality (current avg confidence: {avg_conf:.2f})"
                variant = await self.prompt_optimizer.create_prompt_variant(
                    name, prompt_data["content"], goal
                )
                results.append({"prompt": name, "status": "variant_created", "variant_id": variant["id"]})
            except Exception as e:
                logger.error(f"Failed to optimize prompt {name}: {e}")
                results.append({"prompt": name, "status": "error", "error": str(e)})

        self.last_run["prompt_optimization"] = datetime.utcnow().isoformat()
        return {"results": results, "timestamp": self.last_run["prompt_optimization"]}

    def get_active_prompt(self, prompt_name: str) -> Optional[str]:
        """
        Get the active production prompt text, if one has been promoted.

        Args:
            prompt_name: Prompt name (e.g., "research", "verifier")

        Returns:
            Production prompt text, or None if no production version exists
        """
        if not self.prompt_optimizer:
            return None

        production = self.prompt_optimizer._get_production_variant(prompt_name)
        if production:
            return production.get("prompt_text")
        return None

    # ── Skill Distillation (Task 36) ─────────────────────────────────────────

    async def run_skill_distillation(self, min_frequency: int = 3) -> Dict[str, Any]:
        """
        Run skill distillation from recent case patterns.
        """
        if not self.skill_distiller:
            return {"status": "skipped", "reason": "skill_distiller not configured"}

        # Use in-memory case learnings as source data
        candidates = self.skill_distiller.detect_skill_candidates(
            self._case_learnings, min_frequency=min_frequency
        )

        skills_created = []
        for candidate in candidates[:5]:
            example_cases = [
                l for l in self._case_learnings
                if l.get("domain") == candidate.get("domain")
            ][:3]
            try:
                skill = await self.skill_distiller.distill_skill(candidate, example_cases)
                skills_created.append(skill)
            except Exception as e:
                logger.error(f"Failed to distill skill: {e}")

        self.last_run["skill_distillation"] = datetime.utcnow().isoformat()

        return {
            "candidates_found": len(candidates),
            "skills_created": len(skills_created),
            "skills": skills_created,
            "timestamp": self.last_run["skill_distillation"],
        }

    # ── Trust & Freshness (Task 37) ──────────────────────────────────────────

    async def run_freshness_refresh(self) -> Dict[str, Any]:
        """
        Check freshness of all knowledge items and flag stale ones.
        """
        if not self.trust_manager:
            return {"status": "skipped", "reason": "trust_manager not configured"}

        all_items = self.knowledge_store.list_all()
        stale = self.trust_manager.get_stale_items(all_items, threshold=0.3)
        recommendations = self.trust_manager.recommend_refresh(stale, max_recommendations=10)

        # Update freshness scores
        for item in all_items:
            freshness = self.trust_manager.calculate_freshness(item)
            item_id = item.get("id")
            if item_id:
                self.trust_manager.update_freshness(item_id, freshness)

        self.last_run["freshness_refresh"] = datetime.utcnow().isoformat()

        return {
            "total_items": len(all_items),
            "stale_items": len(stale),
            "refresh_recommendations": len(recommendations),
            "recommendations": recommendations,
            "timestamp": self.last_run["freshness_refresh"],
        }

    # ── Status & Insights ────────────────────────────────────────────────────

    def get_status(self) -> Dict[str, Any]:
        """Get learning engine status."""
        storage_stats = self.knowledge_store.get_storage_stats()

        return {
            "storage": storage_stats,
            "last_run": self.last_run,
            "enabled": True,
            "cases_learned": len(self._case_learnings),
            "components": {
                "knowledge_store": True,
                "knowledge_ingestor": True,
                "prompt_optimizer": self.prompt_optimizer is not None,
                "skill_distiller": self.skill_distiller is not None,
                "trust_manager": self.trust_manager is not None,
            },
        }

    def get_insights(self) -> Dict[str, Any]:
        """Get comprehensive learning insights."""
        recent_items = self.knowledge_store.list_all(limit=10)

        return {
            "recent_knowledge": [
                {
                    "id": item.get("id"),
                    "title": item.get("title"),
                    "source": item.get("source"),
                    "saved_at": item.get("saved_at"),
                }
                for item in recent_items
            ],
            "storage_stats": self.knowledge_store.get_storage_stats(),
            "route_effectiveness": self.get_route_effectiveness(),
            "prompt_performance": self.get_prompt_performance(),
            "patterns": self.detect_patterns(),
        }