Janus-backend / backend /app /services /adaptive_intelligence.py
DevodG's picture
deploy: Janus full system stabilization
24f95f0
"""
Adaptive Intelligence Engine for MiroOrg v2.
This is the system's "meta-brain" — it learns from ALL past cases to:
1. Determine optimal pipeline depth per query type
2. Build domain expertise that accumulates over time
3. Recognize cross-case patterns no single query reveals
4. Develop a consistent analytical personality
This is what makes MiroOrg genuinely unique — it doesn't just answer questions,
it builds institutional intelligence that compounds with every interaction.
"""
import json
import os
import time
from pathlib import Path
from typing import Dict, List, Any, Optional
from collections import defaultdict
DATA_DIR = Path(__file__).parent.parent / "data" / "adaptive"
DATA_DIR.mkdir(parents=True, exist_ok=True)
class DomainExpertise:
"""Tracks and accumulates domain expertise over time."""
def __init__(self, domain: str):
self.domain = domain
self.case_count = 0
self.success_rate = 0.0
self.key_entities: Dict[str, int] = defaultdict(int)
self.common_patterns: Dict[str, int] = defaultdict(int)
self.trusted_sources: Dict[str, float] = defaultdict(lambda: 0.5)
self.typical_confidence = 0.5
self.last_updated = time.time()
def update_from_case(self, case: Dict) -> None:
"""Absorb learning from a completed case."""
self.case_count += 1
# Update success rate (exponential moving average)
final = case.get("final", {})
confidence = final.get("confidence", 0.5)
alpha = 0.3 # Learning rate
self.success_rate = alpha * confidence + (1 - alpha) * self.success_rate
self.typical_confidence = (
alpha * confidence + (1 - alpha) * self.typical_confidence
)
# Extract entities from research
research = case.get("research", {})
for fact in research.get("key_facts", []):
# Simple entity extraction - capitalized words
words = fact.split()
for word in words:
cleaned = word.strip(".,;:()\"'")
if cleaned and cleaned[0].isupper() and len(cleaned) > 2:
self.key_entities[cleaned] += 1
# Track patterns from route
route = case.get("route", {})
complexity = route.get("complexity", "medium")
self.common_patterns[f"complexity:{complexity}"] += 1
# Update source trust
for source in research.get("sources", []):
source_name = (
source.split("://")[-1].split("/")[0] if "://" in source else source
)
if source_name:
current = self.trusted_sources[source_name]
self.trusted_sources[source_name] = current + 0.05 * (
confidence - current
)
self.last_updated = time.time()
def get_expertise_summary(self) -> Dict:
"""Get a summary of domain expertise for prompt enrichment."""
top_entities = sorted(
self.key_entities.items(), key=lambda x: x[1], reverse=True
)[:10]
top_sources = sorted(
self.trusted_sources.items(), key=lambda x: x[1], reverse=True
)[:5]
return {
"domain": self.domain,
"case_count": self.case_count,
"success_rate": round(self.success_rate, 2),
"typical_confidence": round(self.typical_confidence, 2),
"key_entities": [e for e, _ in top_entities],
"trusted_sources": [
{"source": s, "trust": round(t, 2)} for s, t in top_sources
],
"common_patterns": dict(self.common_patterns),
}
def to_dict(self) -> Dict:
return {
"domain": self.domain,
"case_count": self.case_count,
"success_rate": self.success_rate,
"key_entities": dict(self.key_entities),
"common_patterns": dict(self.common_patterns),
"trusted_sources": dict(self.trusted_sources),
"typical_confidence": self.typical_confidence,
"last_updated": self.last_updated,
}
@classmethod
def from_dict(cls, data: Dict) -> "DomainExpertise":
expertise = cls(data["domain"])
expertise.case_count = data.get("case_count", 0)
expertise.success_rate = data.get("success_rate", 0.0)
expertise.key_entities = defaultdict(int, data.get("key_entities", {}))
expertise.common_patterns = defaultdict(int, data.get("common_patterns", {}))
expertise.trusted_sources = defaultdict(
lambda: 0.5, data.get("trusted_sources", {})
)
expertise.typical_confidence = data.get("typical_confidence", 0.5)
expertise.last_updated = data.get("last_updated", time.time())
return expertise
class CrossCasePatternRecognizer:
"""Finds patterns across ALL cases that no single query reveals."""
def __init__(self):
self.domain_cooccurrence: Dict[str, Dict[str, int]] = defaultdict(
lambda: defaultdict(int)
)
self.query_complexity_trend: List[Dict] = []
self.source_effectiveness: Dict[str, Dict[str, float]] = defaultdict(
lambda: {"hits": 0, "total": 0}
)
self.time_patterns: Dict[str, List[float]] = defaultdict(list)
def update_from_case(self, case: Dict, elapsed: float) -> None:
"""Absorb case into pattern recognition."""
route = case.get("route", {})
domain = route.get("domain", "general")
complexity = route.get("complexity", "medium")
# Track complexity trends
self.query_complexity_trend.append(
{
"domain": domain,
"complexity": complexity,
"timestamp": time.time(),
}
)
# Track time patterns
self.time_patterns[domain].append(elapsed)
# Track source effectiveness
research = case.get("research", {})
final = case.get("final", {})
confidence = final.get("confidence", 0.5)
for source in research.get("sources", []):
source_name = (
source.split("://")[-1].split("/")[0] if "://" in source else source
)
if source_name:
self.source_effectiveness[source_name]["hits"] += (
1 if confidence > 0.7 else 0
)
self.source_effectiveness[source_name]["total"] += 1
def get_insights(self) -> Dict:
"""Get cross-case insights."""
# Most effective sources
effective_sources = {}
for source, stats in self.source_effectiveness.items():
if stats["total"] >= 2:
effective_sources[source] = round(stats["hits"] / stats["total"], 2)
# Average response times by domain
avg_times = {}
for domain, times in self.time_patterns.items():
if times:
avg_times[domain] = round(sum(times) / len(times), 1)
# Recent complexity distribution
recent = self.query_complexity_trend[-50:]
complexity_dist = defaultdict(int)
for q in recent:
complexity_dist[q["complexity"]] += 1
return {
"effective_sources": effective_sources,
"avg_response_times": avg_times,
"complexity_distribution": dict(complexity_dist),
"total_cases_analyzed": len(self.query_complexity_trend),
}
def to_dict(self) -> Dict:
return {
"domain_cooccurrence": {
k: dict(v) for k, v in self.domain_cooccurrence.items()
},
"query_complexity_trend": self.query_complexity_trend[
-100:
], # Keep last 100
"source_effectiveness": dict(self.source_effectiveness),
"time_patterns": {
k: v[-50:] for k, v in self.time_patterns.items()
}, # Keep last 50 per domain
}
@classmethod
def from_dict(cls, data: Dict) -> "CrossCasePatternRecognizer":
recognizer = cls()
cooc = data.get("domain_cooccurrence", {})
for k, v in cooc.items():
for k2, v2 in v.items():
recognizer.domain_cooccurrence[k][k2] = v2
recognizer.query_complexity_trend = data.get("query_complexity_trend", [])
se = data.get("source_effectiveness", {})
for k, v in se.items():
recognizer.source_effectiveness[k] = v
tp = data.get("time_patterns", {})
for k, v in tp.items():
recognizer.time_patterns[k] = v
return recognizer
class AdaptiveIntelligence:
"""The meta-brain of MiroOrg — learns, adapts, and accumulates intelligence."""
def __init__(self, data_dir: Path = DATA_DIR):
self.data_dir = data_dir
self.data_dir.mkdir(parents=True, exist_ok=True)
self.domain_expertise: Dict[str, DomainExpertise] = {}
self.pattern_recognizer = CrossCasePatternRecognizer()
self.system_personality = {
"analytical_depth": 0.7, # How deep the analysis goes (0-1)
"confidence_threshold": 0.6, # Minimum confidence to state something as fact
"skepticism_level": 0.3, # How skeptical of unverified claims (0-1)
"actionability_focus": 0.8, # How much to focus on actionable insights (0-1)
"deliberation_threshold": 0.5, # Controls internal reflection loops (0-1)
"cognitive_breadth": 0.5, # Controls diversity of search/strategy (0-1)
"socratic_depth": 0.4, # How critical the verification is (0-1)
}
self.total_cases = 0
self._load()
def _load(self) -> None:
"""Load persisted intelligence."""
expertise_file = self.data_dir / "domain_expertise.json"
if expertise_file.exists():
try:
with open(expertise_file) as f:
data = json.load(f)
for domain_data in data:
expertise = DomainExpertise.from_dict(domain_data)
self.domain_expertise[expertise.domain] = expertise
except Exception:
pass
patterns_file = self.data_dir / "patterns.json"
if patterns_file.exists():
try:
with open(patterns_file) as f:
data = json.load(f)
self.pattern_recognizer = CrossCasePatternRecognizer.from_dict(data)
except Exception:
pass
personality_file = self.data_dir / "personality.json"
if personality_file.exists():
try:
with open(personality_file) as f:
self.system_personality = json.load(f)
except Exception:
pass
meta_file = self.data_dir / "meta.json"
if meta_file.exists():
try:
with open(meta_file) as f:
meta = json.load(f)
self.total_cases = meta.get("total_cases", 0)
except Exception:
pass
def _save(self) -> None:
"""Persist intelligence to disk."""
try:
with open(self.data_dir / "domain_expertise.json", "w") as f:
json.dump(
[e.to_dict() for e in self.domain_expertise.values()], f, indent=2
)
with open(self.data_dir / "patterns.json", "w") as f:
json.dump(self.pattern_recognizer.to_dict(), f, indent=2)
with open(self.data_dir / "personality.json", "w") as f:
json.dump(self.system_personality, f, indent=2)
with open(self.data_dir / "meta.json", "w") as f:
json.dump({"total_cases": self.total_cases}, f, indent=2)
except Exception as e:
import logging
logging.getLogger(__name__).error(
f"Failed to save adaptive intelligence: {e}"
)
def learn_from_case(self, case: Dict, elapsed: float) -> None:
"""Absorb a completed case into the system's intelligence."""
self.total_cases += 1
route = case.get("route", {})
domain = route.get("domain", "general")
# Update domain expertise
if domain not in self.domain_expertise:
self.domain_expertise[domain] = DomainExpertise(domain)
self.domain_expertise[domain].update_from_case(case)
# Update cross-case patterns
self.pattern_recognizer.update_from_case(case, elapsed)
# Adapt system personality based on accumulated experience
self._adapt_personality()
# Save every 5 cases
if self.total_cases % 5 == 0:
self._save()
def _adapt_personality(self) -> None:
"""Adapt system personality based on accumulated experience."""
if self.total_cases < 10:
return # Need more data
# Calculate overall system confidence
all_confidences = []
for expertise in self.domain_expertise.values():
if expertise.case_count > 0:
all_confidences.append(expertise.success_rate)
if not all_confidences:
return
avg_confidence = sum(all_confidences) / len(all_confidences)
# If system is consistently confident, increase analytical depth
if avg_confidence > 0.75:
self.system_personality["analytical_depth"] = min(
0.95, self.system_personality["analytical_depth"] + 0.02
)
elif avg_confidence < 0.5:
self.system_personality["analytical_depth"] = max(
0.4, self.system_personality["analytical_depth"] - 0.02
)
self.system_personality["skepticism_level"] = min(
0.7, self.system_personality["skepticism_level"] + 0.02
)
def get_context_for_query(self, query: str, domain: str) -> Dict:
"""Get accumulated intelligence context for a new query."""
context = {
"system_personality": self.system_personality,
"total_cases_learned": self.total_cases,
}
# Add domain expertise if available
if domain in self.domain_expertise:
expertise = self.domain_expertise[domain]
context["domain_expertise"] = expertise.get_expertise_summary()
# Add cross-case insights
context["cross_case_insights"] = self.pattern_recognizer.get_insights()
return context
def get_full_intelligence_report(self) -> Dict:
"""Get complete intelligence report."""
return {
"total_cases": self.total_cases,
"system_personality": self.system_personality,
"domain_expertise": {
domain: expertise.get_expertise_summary()
for domain, expertise in self.domain_expertise.items()
},
"cross_case_insights": self.pattern_recognizer.get_insights(),
}
def save(self) -> None:
"""Force save."""
self._save()
# Global instance
adaptive_intelligence = AdaptiveIntelligence()