Spaces:
Running
Running
| """ | |
| Adaptive Intelligence Engine for MiroOrg v2. | |
| This is the system's "meta-brain" — it learns from ALL past cases to: | |
| 1. Determine optimal pipeline depth per query type | |
| 2. Build domain expertise that accumulates over time | |
| 3. Recognize cross-case patterns no single query reveals | |
| 4. Develop a consistent analytical personality | |
| This is what makes MiroOrg genuinely unique — it doesn't just answer questions, | |
| it builds institutional intelligence that compounds with every interaction. | |
| """ | |
| import json | |
| import os | |
| import time | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Optional | |
| from collections import defaultdict | |
| DATA_DIR = Path(__file__).parent.parent / "data" / "adaptive" | |
| DATA_DIR.mkdir(parents=True, exist_ok=True) | |
| class DomainExpertise: | |
| """Tracks and accumulates domain expertise over time.""" | |
| def __init__(self, domain: str): | |
| self.domain = domain | |
| self.case_count = 0 | |
| self.success_rate = 0.0 | |
| self.key_entities: Dict[str, int] = defaultdict(int) | |
| self.common_patterns: Dict[str, int] = defaultdict(int) | |
| self.trusted_sources: Dict[str, float] = defaultdict(lambda: 0.5) | |
| self.typical_confidence = 0.5 | |
| self.last_updated = time.time() | |
| def update_from_case(self, case: Dict) -> None: | |
| """Absorb learning from a completed case.""" | |
| self.case_count += 1 | |
| # Update success rate (exponential moving average) | |
| final = case.get("final", {}) | |
| confidence = final.get("confidence", 0.5) | |
| alpha = 0.3 # Learning rate | |
| self.success_rate = alpha * confidence + (1 - alpha) * self.success_rate | |
| self.typical_confidence = ( | |
| alpha * confidence + (1 - alpha) * self.typical_confidence | |
| ) | |
| # Extract entities from research | |
| research = case.get("research", {}) | |
| for fact in research.get("key_facts", []): | |
| # Simple entity extraction - capitalized words | |
| words = fact.split() | |
| for word in words: | |
| cleaned = word.strip(".,;:()\"'") | |
| if cleaned and cleaned[0].isupper() and len(cleaned) > 2: | |
| self.key_entities[cleaned] += 1 | |
| # Track patterns from route | |
| route = case.get("route", {}) | |
| complexity = route.get("complexity", "medium") | |
| self.common_patterns[f"complexity:{complexity}"] += 1 | |
| # Update source trust | |
| for source in research.get("sources", []): | |
| source_name = ( | |
| source.split("://")[-1].split("/")[0] if "://" in source else source | |
| ) | |
| if source_name: | |
| current = self.trusted_sources[source_name] | |
| self.trusted_sources[source_name] = current + 0.05 * ( | |
| confidence - current | |
| ) | |
| self.last_updated = time.time() | |
| def get_expertise_summary(self) -> Dict: | |
| """Get a summary of domain expertise for prompt enrichment.""" | |
| top_entities = sorted( | |
| self.key_entities.items(), key=lambda x: x[1], reverse=True | |
| )[:10] | |
| top_sources = sorted( | |
| self.trusted_sources.items(), key=lambda x: x[1], reverse=True | |
| )[:5] | |
| return { | |
| "domain": self.domain, | |
| "case_count": self.case_count, | |
| "success_rate": round(self.success_rate, 2), | |
| "typical_confidence": round(self.typical_confidence, 2), | |
| "key_entities": [e for e, _ in top_entities], | |
| "trusted_sources": [ | |
| {"source": s, "trust": round(t, 2)} for s, t in top_sources | |
| ], | |
| "common_patterns": dict(self.common_patterns), | |
| } | |
| def to_dict(self) -> Dict: | |
| return { | |
| "domain": self.domain, | |
| "case_count": self.case_count, | |
| "success_rate": self.success_rate, | |
| "key_entities": dict(self.key_entities), | |
| "common_patterns": dict(self.common_patterns), | |
| "trusted_sources": dict(self.trusted_sources), | |
| "typical_confidence": self.typical_confidence, | |
| "last_updated": self.last_updated, | |
| } | |
| def from_dict(cls, data: Dict) -> "DomainExpertise": | |
| expertise = cls(data["domain"]) | |
| expertise.case_count = data.get("case_count", 0) | |
| expertise.success_rate = data.get("success_rate", 0.0) | |
| expertise.key_entities = defaultdict(int, data.get("key_entities", {})) | |
| expertise.common_patterns = defaultdict(int, data.get("common_patterns", {})) | |
| expertise.trusted_sources = defaultdict( | |
| lambda: 0.5, data.get("trusted_sources", {}) | |
| ) | |
| expertise.typical_confidence = data.get("typical_confidence", 0.5) | |
| expertise.last_updated = data.get("last_updated", time.time()) | |
| return expertise | |
| class CrossCasePatternRecognizer: | |
| """Finds patterns across ALL cases that no single query reveals.""" | |
| def __init__(self): | |
| self.domain_cooccurrence: Dict[str, Dict[str, int]] = defaultdict( | |
| lambda: defaultdict(int) | |
| ) | |
| self.query_complexity_trend: List[Dict] = [] | |
| self.source_effectiveness: Dict[str, Dict[str, float]] = defaultdict( | |
| lambda: {"hits": 0, "total": 0} | |
| ) | |
| self.time_patterns: Dict[str, List[float]] = defaultdict(list) | |
| def update_from_case(self, case: Dict, elapsed: float) -> None: | |
| """Absorb case into pattern recognition.""" | |
| route = case.get("route", {}) | |
| domain = route.get("domain", "general") | |
| complexity = route.get("complexity", "medium") | |
| # Track complexity trends | |
| self.query_complexity_trend.append( | |
| { | |
| "domain": domain, | |
| "complexity": complexity, | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| # Track time patterns | |
| self.time_patterns[domain].append(elapsed) | |
| # Track source effectiveness | |
| research = case.get("research", {}) | |
| final = case.get("final", {}) | |
| confidence = final.get("confidence", 0.5) | |
| for source in research.get("sources", []): | |
| source_name = ( | |
| source.split("://")[-1].split("/")[0] if "://" in source else source | |
| ) | |
| if source_name: | |
| self.source_effectiveness[source_name]["hits"] += ( | |
| 1 if confidence > 0.7 else 0 | |
| ) | |
| self.source_effectiveness[source_name]["total"] += 1 | |
| def get_insights(self) -> Dict: | |
| """Get cross-case insights.""" | |
| # Most effective sources | |
| effective_sources = {} | |
| for source, stats in self.source_effectiveness.items(): | |
| if stats["total"] >= 2: | |
| effective_sources[source] = round(stats["hits"] / stats["total"], 2) | |
| # Average response times by domain | |
| avg_times = {} | |
| for domain, times in self.time_patterns.items(): | |
| if times: | |
| avg_times[domain] = round(sum(times) / len(times), 1) | |
| # Recent complexity distribution | |
| recent = self.query_complexity_trend[-50:] | |
| complexity_dist = defaultdict(int) | |
| for q in recent: | |
| complexity_dist[q["complexity"]] += 1 | |
| return { | |
| "effective_sources": effective_sources, | |
| "avg_response_times": avg_times, | |
| "complexity_distribution": dict(complexity_dist), | |
| "total_cases_analyzed": len(self.query_complexity_trend), | |
| } | |
| def to_dict(self) -> Dict: | |
| return { | |
| "domain_cooccurrence": { | |
| k: dict(v) for k, v in self.domain_cooccurrence.items() | |
| }, | |
| "query_complexity_trend": self.query_complexity_trend[ | |
| -100: | |
| ], # Keep last 100 | |
| "source_effectiveness": dict(self.source_effectiveness), | |
| "time_patterns": { | |
| k: v[-50:] for k, v in self.time_patterns.items() | |
| }, # Keep last 50 per domain | |
| } | |
| def from_dict(cls, data: Dict) -> "CrossCasePatternRecognizer": | |
| recognizer = cls() | |
| cooc = data.get("domain_cooccurrence", {}) | |
| for k, v in cooc.items(): | |
| for k2, v2 in v.items(): | |
| recognizer.domain_cooccurrence[k][k2] = v2 | |
| recognizer.query_complexity_trend = data.get("query_complexity_trend", []) | |
| se = data.get("source_effectiveness", {}) | |
| for k, v in se.items(): | |
| recognizer.source_effectiveness[k] = v | |
| tp = data.get("time_patterns", {}) | |
| for k, v in tp.items(): | |
| recognizer.time_patterns[k] = v | |
| return recognizer | |
| class AdaptiveIntelligence: | |
| """The meta-brain of MiroOrg — learns, adapts, and accumulates intelligence.""" | |
| def __init__(self, data_dir: Path = DATA_DIR): | |
| self.data_dir = data_dir | |
| self.data_dir.mkdir(parents=True, exist_ok=True) | |
| self.domain_expertise: Dict[str, DomainExpertise] = {} | |
| self.pattern_recognizer = CrossCasePatternRecognizer() | |
| self.system_personality = { | |
| "analytical_depth": 0.7, # How deep the analysis goes (0-1) | |
| "confidence_threshold": 0.6, # Minimum confidence to state something as fact | |
| "skepticism_level": 0.3, # How skeptical of unverified claims (0-1) | |
| "actionability_focus": 0.8, # How much to focus on actionable insights (0-1) | |
| "deliberation_threshold": 0.5, # Controls internal reflection loops (0-1) | |
| "cognitive_breadth": 0.5, # Controls diversity of search/strategy (0-1) | |
| "socratic_depth": 0.4, # How critical the verification is (0-1) | |
| } | |
| self.total_cases = 0 | |
| self._load() | |
| def _load(self) -> None: | |
| """Load persisted intelligence.""" | |
| expertise_file = self.data_dir / "domain_expertise.json" | |
| if expertise_file.exists(): | |
| try: | |
| with open(expertise_file) as f: | |
| data = json.load(f) | |
| for domain_data in data: | |
| expertise = DomainExpertise.from_dict(domain_data) | |
| self.domain_expertise[expertise.domain] = expertise | |
| except Exception: | |
| pass | |
| patterns_file = self.data_dir / "patterns.json" | |
| if patterns_file.exists(): | |
| try: | |
| with open(patterns_file) as f: | |
| data = json.load(f) | |
| self.pattern_recognizer = CrossCasePatternRecognizer.from_dict(data) | |
| except Exception: | |
| pass | |
| personality_file = self.data_dir / "personality.json" | |
| if personality_file.exists(): | |
| try: | |
| with open(personality_file) as f: | |
| self.system_personality = json.load(f) | |
| except Exception: | |
| pass | |
| meta_file = self.data_dir / "meta.json" | |
| if meta_file.exists(): | |
| try: | |
| with open(meta_file) as f: | |
| meta = json.load(f) | |
| self.total_cases = meta.get("total_cases", 0) | |
| except Exception: | |
| pass | |
| def _save(self) -> None: | |
| """Persist intelligence to disk.""" | |
| try: | |
| with open(self.data_dir / "domain_expertise.json", "w") as f: | |
| json.dump( | |
| [e.to_dict() for e in self.domain_expertise.values()], f, indent=2 | |
| ) | |
| with open(self.data_dir / "patterns.json", "w") as f: | |
| json.dump(self.pattern_recognizer.to_dict(), f, indent=2) | |
| with open(self.data_dir / "personality.json", "w") as f: | |
| json.dump(self.system_personality, f, indent=2) | |
| with open(self.data_dir / "meta.json", "w") as f: | |
| json.dump({"total_cases": self.total_cases}, f, indent=2) | |
| except Exception as e: | |
| import logging | |
| logging.getLogger(__name__).error( | |
| f"Failed to save adaptive intelligence: {e}" | |
| ) | |
| def learn_from_case(self, case: Dict, elapsed: float) -> None: | |
| """Absorb a completed case into the system's intelligence.""" | |
| self.total_cases += 1 | |
| route = case.get("route", {}) | |
| domain = route.get("domain", "general") | |
| # Update domain expertise | |
| if domain not in self.domain_expertise: | |
| self.domain_expertise[domain] = DomainExpertise(domain) | |
| self.domain_expertise[domain].update_from_case(case) | |
| # Update cross-case patterns | |
| self.pattern_recognizer.update_from_case(case, elapsed) | |
| # Adapt system personality based on accumulated experience | |
| self._adapt_personality() | |
| # Save every 5 cases | |
| if self.total_cases % 5 == 0: | |
| self._save() | |
| def _adapt_personality(self) -> None: | |
| """Adapt system personality based on accumulated experience.""" | |
| if self.total_cases < 10: | |
| return # Need more data | |
| # Calculate overall system confidence | |
| all_confidences = [] | |
| for expertise in self.domain_expertise.values(): | |
| if expertise.case_count > 0: | |
| all_confidences.append(expertise.success_rate) | |
| if not all_confidences: | |
| return | |
| avg_confidence = sum(all_confidences) / len(all_confidences) | |
| # If system is consistently confident, increase analytical depth | |
| if avg_confidence > 0.75: | |
| self.system_personality["analytical_depth"] = min( | |
| 0.95, self.system_personality["analytical_depth"] + 0.02 | |
| ) | |
| elif avg_confidence < 0.5: | |
| self.system_personality["analytical_depth"] = max( | |
| 0.4, self.system_personality["analytical_depth"] - 0.02 | |
| ) | |
| self.system_personality["skepticism_level"] = min( | |
| 0.7, self.system_personality["skepticism_level"] + 0.02 | |
| ) | |
| def get_context_for_query(self, query: str, domain: str) -> Dict: | |
| """Get accumulated intelligence context for a new query.""" | |
| context = { | |
| "system_personality": self.system_personality, | |
| "total_cases_learned": self.total_cases, | |
| } | |
| # Add domain expertise if available | |
| if domain in self.domain_expertise: | |
| expertise = self.domain_expertise[domain] | |
| context["domain_expertise"] = expertise.get_expertise_summary() | |
| # Add cross-case insights | |
| context["cross_case_insights"] = self.pattern_recognizer.get_insights() | |
| return context | |
| def get_full_intelligence_report(self) -> Dict: | |
| """Get complete intelligence report.""" | |
| return { | |
| "total_cases": self.total_cases, | |
| "system_personality": self.system_personality, | |
| "domain_expertise": { | |
| domain: expertise.get_expertise_summary() | |
| for domain, expertise in self.domain_expertise.items() | |
| }, | |
| "cross_case_insights": self.pattern_recognizer.get_insights(), | |
| } | |
| def save(self) -> None: | |
| """Force save.""" | |
| self._save() | |
| # Global instance | |
| adaptive_intelligence = AdaptiveIntelligence() | |