Spaces:
Running
Running
| """ | |
| Core learning engine that coordinates all learning activities. | |
| Orchestrates knowledge ingestion, experience learning, prompt evolution, | |
| skill distillation, trust management, and freshness management. | |
| """ | |
| import logging | |
| from typing import Dict, Any, List, Optional | |
| from datetime import datetime | |
| from collections import Counter | |
| from .knowledge_ingestor import KnowledgeIngestor | |
| from .knowledge_store import KnowledgeStore | |
| from .prompt_optimizer import PromptOptimizer | |
| from .skill_distiller import SkillDistiller | |
| from .trust_manager import TrustManager | |
| logger = logging.getLogger(__name__) | |
| class LearningEngine: | |
| """Coordinates all learning activities.""" | |
| def __init__( | |
| self, | |
| knowledge_store: KnowledgeStore, | |
| knowledge_ingestor: KnowledgeIngestor, | |
| prompt_optimizer: Optional[PromptOptimizer] = None, | |
| skill_distiller: Optional[SkillDistiller] = None, | |
| trust_manager: Optional[TrustManager] = None, | |
| ): | |
| self.knowledge_store = knowledge_store | |
| self.knowledge_ingestor = knowledge_ingestor | |
| self.prompt_optimizer = prompt_optimizer | |
| self.skill_distiller = skill_distiller | |
| self.trust_manager = trust_manager | |
| self.last_run: Dict[str, str] = {} | |
| # In-memory learning metadata (flushed periodically) | |
| self._case_learnings: List[Dict[str, Any]] = [] | |
| self._route_stats: Counter = Counter() | |
| self._provider_stats: Dict[str, Dict[str, int]] = {} | |
| self._prompt_performance: Dict[str, Dict[str, Any]] = {} | |
| # ββ Knowledge Ingestion ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def run_knowledge_ingestion(self, topics: list[str]) -> Dict[str, Any]: | |
| """ | |
| Run knowledge ingestion for specified topics. | |
| Args: | |
| topics: List of topics to ingest knowledge about | |
| Returns: | |
| Ingestion results | |
| """ | |
| logger.info(f"Running knowledge ingestion for topics: {topics}") | |
| total_items = 0 | |
| results = [] | |
| for topic in topics: | |
| search_items = await self.knowledge_ingestor.ingest_from_search(topic) | |
| for item in search_items: | |
| self.knowledge_store.save_knowledge(item) | |
| total_items += 1 | |
| news_items = await self.knowledge_ingestor.ingest_from_news(topic) | |
| for item in news_items: | |
| self.knowledge_store.save_knowledge(item) | |
| total_items += 1 | |
| results.append({ | |
| "topic": topic, | |
| "search_items": len(search_items), | |
| "news_items": len(news_items), | |
| }) | |
| self.last_run["knowledge_ingestion"] = datetime.utcnow().isoformat() | |
| logger.info(f"Knowledge ingestion complete: {total_items} items ingested") | |
| return { | |
| "total_items": total_items, | |
| "results": results, | |
| "timestamp": self.last_run["knowledge_ingestion"], | |
| } | |
| async def run_cleanup(self, expiration_days: int = 30) -> Dict[str, Any]: | |
| """Run cleanup of expired knowledge.""" | |
| logger.info("Running knowledge cleanup") | |
| deleted_count = self.knowledge_store.delete_expired_knowledge(expiration_days) | |
| self.last_run["cleanup"] = datetime.utcnow().isoformat() | |
| return { | |
| "deleted_count": deleted_count, | |
| "timestamp": self.last_run["cleanup"], | |
| } | |
| # ββ Experience Learning (Task 34) ββββββββββββββββββββββββββββββββββββββββ | |
| def learn_from_case(self, case_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Extract learning metadata from a completed case execution. | |
| Args: | |
| case_data: Complete case payload | |
| Returns: | |
| Learning metadata extracted | |
| """ | |
| case_id = case_data.get("case_id", "unknown") | |
| logger.info(f"Learning from case {case_id}") | |
| route = case_data.get("route", {}) | |
| outputs = case_data.get("outputs") or self._derive_outputs(case_data) | |
| # 1. Track route effectiveness | |
| route_key = f"{route.get('domain_pack', 'general')}:{route.get('execution_mode', 'standard')}" | |
| self._route_stats[route_key] += 1 | |
| # 2. Track which agents produced useful output | |
| agents_used = [] | |
| agent_quality = {} | |
| for output in outputs: | |
| if isinstance(output, dict): | |
| agent_name = output.get("agent", "unknown") | |
| summary = output.get("summary", "") | |
| confidence = output.get("confidence", 0.0) | |
| agents_used.append(agent_name) | |
| agent_quality[agent_name] = { | |
| "output_length": len(summary), | |
| "confidence": confidence, | |
| "produced_output": len(summary) > 10, | |
| } | |
| # 3. Build learning record | |
| learning = { | |
| "case_id": case_id, | |
| "route": route, | |
| "agents_used": agents_used, | |
| "agent_quality": agent_quality, | |
| "domain": route.get("domain_pack", "general"), | |
| "complexity": route.get("complexity", "medium"), | |
| "execution_mode": route.get("execution_mode", "standard"), | |
| "learned_at": datetime.utcnow().isoformat(), | |
| } | |
| self._case_learnings.append(learning) | |
| # Keep only last 500 learnings in memory | |
| if len(self._case_learnings) > 500: | |
| self._case_learnings = self._case_learnings[-500:] | |
| # 4. Update trust scores for sources mentioned in research | |
| if self.trust_manager: | |
| for output in outputs: | |
| if isinstance(output, dict) and output.get("agent") == "research": | |
| sources = output.get("details", {}).get("sources", []) | |
| for source in sources: | |
| if isinstance(source, str): | |
| self.trust_manager.update_trust(source, True, weight=0.5) | |
| logger.info(f"Learned from case {case_id}: route={route_key}, agents={agents_used}") | |
| return learning | |
| def _derive_outputs(self, case_data: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """Backfill agent outputs from the current case shape.""" | |
| outputs: List[Dict[str, Any]] = [] | |
| def _append(agent: str, details: Dict[str, Any]) -> None: | |
| if not isinstance(details, dict) or not details: | |
| return | |
| summary = ( | |
| details.get("summary") | |
| or details.get("response") | |
| or details.get("estimated_output") | |
| or "" | |
| ) | |
| outputs.append( | |
| { | |
| "agent": agent, | |
| "summary": str(summary), | |
| "confidence": float(details.get("confidence", 0.0) or 0.0), | |
| "details": details, | |
| } | |
| ) | |
| _append("research", case_data.get("research", {})) | |
| _append("planner", case_data.get("planner", {})) | |
| _append("verifier", case_data.get("verifier", {})) | |
| _append("synthesizer", case_data.get("final", {})) | |
| return outputs | |
| def detect_patterns(self, min_frequency: int = 3) -> List[Dict[str, Any]]: | |
| """ | |
| Detect patterns in recent case executions. | |
| Args: | |
| min_frequency: Minimum occurrences to count as a pattern | |
| Returns: | |
| List of detected patterns | |
| """ | |
| if len(self._case_learnings) < min_frequency: | |
| return [] | |
| patterns = [] | |
| # Pattern 1: Domain frequency | |
| domain_counts = Counter(l["domain"] for l in self._case_learnings) | |
| for domain, count in domain_counts.items(): | |
| if count >= min_frequency: | |
| patterns.append({ | |
| "type": "domain_frequency", | |
| "domain": domain, | |
| "count": count, | |
| "percentage": count / len(self._case_learnings) * 100, | |
| }) | |
| # Pattern 2: Execution mode frequency | |
| mode_counts = Counter(l["execution_mode"] for l in self._case_learnings) | |
| for mode, count in mode_counts.items(): | |
| if count >= min_frequency: | |
| patterns.append({ | |
| "type": "execution_mode_frequency", | |
| "mode": mode, | |
| "count": count, | |
| "percentage": count / len(self._case_learnings) * 100, | |
| }) | |
| # Pattern 3: Agent combinations that produce high confidence | |
| high_confidence_combos = Counter() | |
| for learning in self._case_learnings: | |
| quality = learning.get("agent_quality", {}) | |
| high_conf_agents = [ | |
| a for a, q in quality.items() | |
| if q.get("confidence", 0) > 0.7 | |
| ] | |
| if high_conf_agents: | |
| high_confidence_combos[tuple(sorted(high_conf_agents))] += 1 | |
| for combo, count in high_confidence_combos.items(): | |
| if count >= min_frequency: | |
| patterns.append({ | |
| "type": "high_confidence_agents", | |
| "agents": list(combo), | |
| "count": count, | |
| }) | |
| self.last_run["pattern_detection"] = datetime.utcnow().isoformat() | |
| logger.info(f"Detected {len(patterns)} patterns from {len(self._case_learnings)} cases") | |
| return patterns | |
| def get_route_effectiveness(self) -> Dict[str, Any]: | |
| """Get route effectiveness insights.""" | |
| total = sum(self._route_stats.values()) | |
| if total == 0: | |
| return {"total_cases": 0, "routes": {}} | |
| return { | |
| "total_cases": total, | |
| "routes": { | |
| route: { | |
| "count": count, | |
| "percentage": round(count / total * 100, 1), | |
| } | |
| for route, count in self._route_stats.most_common() | |
| }, | |
| } | |
| def get_prompt_performance(self) -> Dict[str, Any]: | |
| """Get prompt performance insights from case learnings.""" | |
| if not self._case_learnings: | |
| return {"total_cases": 0, "agents": {}} | |
| agent_stats: Dict[str, Dict[str, Any]] = {} | |
| for learning in self._case_learnings: | |
| for agent, quality in learning.get("agent_quality", {}).items(): | |
| if agent not in agent_stats: | |
| agent_stats[agent] = { | |
| "total_runs": 0, | |
| "total_confidence": 0.0, | |
| "produced_output_count": 0, | |
| } | |
| agent_stats[agent]["total_runs"] += 1 | |
| agent_stats[agent]["total_confidence"] += quality.get("confidence", 0) | |
| if quality.get("produced_output", False): | |
| agent_stats[agent]["produced_output_count"] += 1 | |
| # Calculate averages | |
| for agent, stats in agent_stats.items(): | |
| runs = stats["total_runs"] | |
| stats["avg_confidence"] = round(stats["total_confidence"] / runs, 3) if runs > 0 else 0 | |
| stats["output_rate"] = round(stats["produced_output_count"] / runs, 3) if runs > 0 else 0 | |
| return { | |
| "total_cases": len(self._case_learnings), | |
| "agents": agent_stats, | |
| } | |
| # ββ Prompt Evolution (Task 35) βββββββββββββββββββββββββββββββββββββββββββ | |
| async def run_prompt_optimization(self, prompt_names: List[str]) -> Dict[str, Any]: | |
| """ | |
| Run prompt optimization for specified prompts. | |
| Uses prompt_optimizer to create improved variants based on | |
| prompt performance data. | |
| """ | |
| if not self.prompt_optimizer: | |
| return {"status": "skipped", "reason": "prompt_optimizer not configured"} | |
| results = [] | |
| performance = self.get_prompt_performance() | |
| for name in prompt_names: | |
| agent_perf = performance.get("agents", {}).get(name, {}) | |
| avg_conf = agent_perf.get("avg_confidence", 0.5) | |
| # Only optimize prompts with low average confidence | |
| if avg_conf > 0.8: | |
| results.append({"prompt": name, "status": "skipped", "reason": "already high performance"}) | |
| continue | |
| try: | |
| from app.services.prompt_store import get_prompt | |
| prompt_data = get_prompt(name) | |
| if not prompt_data: | |
| continue | |
| goal = f"Improve output quality (current avg confidence: {avg_conf:.2f})" | |
| variant = await self.prompt_optimizer.create_prompt_variant( | |
| name, prompt_data["content"], goal | |
| ) | |
| results.append({"prompt": name, "status": "variant_created", "variant_id": variant["id"]}) | |
| except Exception as e: | |
| logger.error(f"Failed to optimize prompt {name}: {e}") | |
| results.append({"prompt": name, "status": "error", "error": str(e)}) | |
| self.last_run["prompt_optimization"] = datetime.utcnow().isoformat() | |
| return {"results": results, "timestamp": self.last_run["prompt_optimization"]} | |
| def get_active_prompt(self, prompt_name: str) -> Optional[str]: | |
| """ | |
| Get the active production prompt text, if one has been promoted. | |
| Args: | |
| prompt_name: Prompt name (e.g., "research", "verifier") | |
| Returns: | |
| Production prompt text, or None if no production version exists | |
| """ | |
| if not self.prompt_optimizer: | |
| return None | |
| production = self.prompt_optimizer._get_production_variant(prompt_name) | |
| if production: | |
| return production.get("prompt_text") | |
| return None | |
| # ββ Skill Distillation (Task 36) βββββββββββββββββββββββββββββββββββββββββ | |
| async def run_skill_distillation(self, min_frequency: int = 3) -> Dict[str, Any]: | |
| """ | |
| Run skill distillation from recent case patterns. | |
| """ | |
| if not self.skill_distiller: | |
| return {"status": "skipped", "reason": "skill_distiller not configured"} | |
| # Use in-memory case learnings as source data | |
| candidates = self.skill_distiller.detect_skill_candidates( | |
| self._case_learnings, min_frequency=min_frequency | |
| ) | |
| skills_created = [] | |
| for candidate in candidates[:5]: | |
| example_cases = [ | |
| l for l in self._case_learnings | |
| if l.get("domain") == candidate.get("domain") | |
| ][:3] | |
| try: | |
| skill = await self.skill_distiller.distill_skill(candidate, example_cases) | |
| skills_created.append(skill) | |
| except Exception as e: | |
| logger.error(f"Failed to distill skill: {e}") | |
| self.last_run["skill_distillation"] = datetime.utcnow().isoformat() | |
| return { | |
| "candidates_found": len(candidates), | |
| "skills_created": len(skills_created), | |
| "skills": skills_created, | |
| "timestamp": self.last_run["skill_distillation"], | |
| } | |
| # ββ Trust & Freshness (Task 37) ββββββββββββββββββββββββββββββββββββββββββ | |
| async def run_freshness_refresh(self) -> Dict[str, Any]: | |
| """ | |
| Check freshness of all knowledge items and flag stale ones. | |
| """ | |
| if not self.trust_manager: | |
| return {"status": "skipped", "reason": "trust_manager not configured"} | |
| all_items = self.knowledge_store.list_all() | |
| stale = self.trust_manager.get_stale_items(all_items, threshold=0.3) | |
| recommendations = self.trust_manager.recommend_refresh(stale, max_recommendations=10) | |
| # Update freshness scores | |
| for item in all_items: | |
| freshness = self.trust_manager.calculate_freshness(item) | |
| item_id = item.get("id") | |
| if item_id: | |
| self.trust_manager.update_freshness(item_id, freshness) | |
| self.last_run["freshness_refresh"] = datetime.utcnow().isoformat() | |
| return { | |
| "total_items": len(all_items), | |
| "stale_items": len(stale), | |
| "refresh_recommendations": len(recommendations), | |
| "recommendations": recommendations, | |
| "timestamp": self.last_run["freshness_refresh"], | |
| } | |
| # ββ Status & Insights ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_status(self) -> Dict[str, Any]: | |
| """Get learning engine status.""" | |
| storage_stats = self.knowledge_store.get_storage_stats() | |
| return { | |
| "storage": storage_stats, | |
| "last_run": self.last_run, | |
| "enabled": True, | |
| "cases_learned": len(self._case_learnings), | |
| "components": { | |
| "knowledge_store": True, | |
| "knowledge_ingestor": True, | |
| "prompt_optimizer": self.prompt_optimizer is not None, | |
| "skill_distiller": self.skill_distiller is not None, | |
| "trust_manager": self.trust_manager is not None, | |
| }, | |
| } | |
| def get_insights(self) -> Dict[str, Any]: | |
| """Get comprehensive learning insights.""" | |
| recent_items = self.knowledge_store.list_all(limit=10) | |
| return { | |
| "recent_knowledge": [ | |
| { | |
| "id": item.get("id"), | |
| "title": item.get("title"), | |
| "source": item.get("source"), | |
| "saved_at": item.get("saved_at"), | |
| } | |
| for item in recent_items | |
| ], | |
| "storage_stats": self.knowledge_store.get_storage_stats(), | |
| "route_effectiveness": self.get_route_effectiveness(), | |
| "prompt_performance": self.get_prompt_performance(), | |
| "patterns": self.detect_patterns(), | |
| } | |