Janus-backend / backend /app /services /sentinel /capability_tracker.py
DevodG's picture
deploy: Janus full system stabilization
24f95f0
"""
Sentinel Capability Tracker - System Capability Metrics
Computes and tracks a composite capability score across 6 dimensions.
Reads existing data directories. Never calls any LLM. Never writes to
any existing data directory - only writes to data/sentinel/.
"""
import json
import logging
import os
import uuid
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Any, List, Optional
logger = logging.getLogger(__name__)
# IMPORTANT COMMENT (must appear in source code):
# agi_progression_index is a VANITY METRIC for developer motivation.
# It is a weighted composite of system health indicators:
# error rate, confidence scores, trust quality, skill growth, etc.
# It does NOT measure general intelligence, emergent reasoning, or AGI.
# The name is aspirational and intentionally optimistic, not scientific.
class CapabilitySnapshot:
"""Represents a snapshot of system capabilities."""
def __init__(
self,
scores: Dict[str, float],
agi_progression_index: float,
delta_from_last: Dict[str, float]
):
self.snapshot_id = str(uuid.uuid4())
self.timestamp = datetime.utcnow().isoformat()
self.scores = scores
self.agi_progression_index = agi_progression_index
self.delta_from_last = delta_from_last
def to_dict(self) -> Dict[str, Any]:
return {
"snapshot_id": self.snapshot_id,
"timestamp": self.timestamp,
"scores": self.scores,
"agi_progression_index": self.agi_progression_index,
"delta_from_last": self.delta_from_last,
}
from app.config import DATA_DIR, SENTINEL_DIR
class CapabilityTracker:
"""Tracks system capability metrics over time."""
def __init__(self):
self.data_dir = DATA_DIR
self.sentinel_dir = SENTINEL_DIR
self.sentinel_dir.mkdir(parents=True, exist_ok=True)
def snapshot(self) -> CapabilitySnapshot:
"""
Compute current capability snapshot.
Returns:
CapabilitySnapshot with all dimensions and AGI index
"""
# Compute all 6 dimensions
scores = {
"reasoning_depth": self._compute_reasoning_depth(),
"source_trust_avg": self._compute_source_trust_avg(),
"skill_coverage": self._compute_skill_coverage(),
"prompt_win_rate_avg": self._compute_prompt_win_rate_avg(),
"stability": self._compute_stability(),
"self_correction_rate": self._compute_self_correction_rate(),
}
# Compute AGI progression index
agi_index = self._compute_agi_index(scores)
# Get delta from last snapshot
delta_from_last = self._compute_delta(scores, agi_index)
# Create snapshot
snapshot = CapabilitySnapshot(
scores=scores,
agi_progression_index=agi_index,
delta_from_last=delta_from_last
)
# Save to history
self._save_snapshot(snapshot)
return snapshot
def trend(self, days: int = 7) -> List[Dict[str, Any]]:
"""
Get capability trend over specified days.
Args:
days: Number of days to look back
Returns:
List of CapabilitySnapshot dicts
"""
try:
history_file = self.sentinel_dir / "capability_history.json"
if not history_file.exists():
return []
with open(history_file, 'r') as f:
history = json.load(f)
# Filter to last N days
cutoff = datetime.utcnow() - timedelta(days=days)
filtered = []
for entry in history:
try:
entry_time = datetime.fromisoformat(entry["timestamp"])
if entry_time >= cutoff:
filtered.append(entry)
except:
continue
return filtered
except Exception as e:
logger.error(f"Failed to get capability trend: {e}")
return []
def _compute_reasoning_depth(self) -> float:
"""Compute reasoning depth from case confidence scores."""
try:
memory_dir = self.data_dir / "memory"
if not memory_dir.exists():
return 0.5
case_files = sorted(memory_dir.glob("*.json"), key=os.path.getmtime, reverse=True)[:30]
if not case_files:
return 0.5
all_confidences = []
for case_file in case_files:
try:
with open(case_file, 'r') as f:
case_data = json.load(f)
outputs = case_data.get("outputs", [])
for output in outputs:
if isinstance(output, dict):
confidence = output.get("confidence")
if confidence is not None:
all_confidences.append(confidence)
except:
continue
if not all_confidences:
return 0.5
return sum(all_confidences) / len(all_confidences)
except Exception as e:
logger.warning(f"Failed to compute reasoning depth: {e}")
return 0.5
def _compute_source_trust_avg(self) -> float:
"""Compute average source trust score."""
try:
config_file = self.sentinel_dir / "sentinel_config.json"
if not config_file.exists():
return 0.5
with open(config_file, 'r') as f:
config = json.load(f)
trust_scores = config.get("source_trust_scores", {})
if not trust_scores:
return 0.5
values = [v for v in trust_scores.values() if isinstance(v, (int, float))]
if not values:
return 0.5
return sum(values) / len(values)
except Exception as e:
logger.warning(f"Failed to compute source trust avg: {e}")
return 0.5
def _compute_skill_coverage(self) -> float:
"""Compute skill coverage metric."""
try:
skills_dir = self.data_dir / "skills"
if not skills_dir.exists():
return 0.0
skill_count = len(list(skills_dir.glob("*.json")))
# Normalize to 0-1 scale (20 skills = 1.0)
return min(skill_count / 20.0, 1.0)
except Exception as e:
logger.warning(f"Failed to compute skill coverage: {e}")
return 0.0
def _compute_prompt_win_rate_avg(self) -> float:
"""Compute average prompt win rate."""
try:
prompt_versions_dir = self.data_dir / "prompt_versions"
if not prompt_versions_dir.exists():
return 0.5
win_rates = []
for version_file in prompt_versions_dir.glob("*.json"):
try:
with open(version_file, 'r') as f:
version_data = json.load(f)
win_rate = version_data.get("win_rate")
if win_rate is not None:
win_rates.append(win_rate)
except:
continue
if not win_rates:
return 0.5
return sum(win_rates) / len(win_rates)
except Exception as e:
logger.warning(f"Failed to compute prompt win rate avg: {e}")
return 0.5
def _compute_stability(self) -> float:
"""Compute stability (1.0 - error_rate)."""
try:
memory_dir = self.data_dir / "memory"
if not memory_dir.exists():
return 1.0
# Get cases from last 7 days
cutoff = datetime.utcnow() - timedelta(days=7)
total_count = 0
failed_count = 0
for case_file in memory_dir.glob("*.json"):
try:
# Check file modification time
mtime = datetime.fromtimestamp(case_file.stat().st_mtime)
if mtime < cutoff:
continue
with open(case_file, 'r') as f:
case_data = json.load(f)
total_count += 1
# Check if case failed
final_answer = case_data.get("final_answer", "")
outputs = case_data.get("outputs", [])
if not final_answer or not outputs:
failed_count += 1
except:
continue
if total_count == 0:
return 1.0
error_rate = failed_count / total_count
return 1.0 - error_rate
except Exception as e:
logger.warning(f"Failed to compute stability: {e}")
return 1.0
def _compute_self_correction_rate(self) -> float:
"""Compute self-correction rate."""
try:
# Get patches from last 7 days
patch_history_file = self.sentinel_dir / "patch_history.json"
alert_history_file = self.sentinel_dir / "alert_history.json"
cutoff = datetime.utcnow() - timedelta(days=7)
patch_count = 0
if patch_history_file.exists():
with open(patch_history_file, 'r') as f:
patches = json.load(f)
for patch in patches:
try:
patch_time = datetime.fromisoformat(patch["timestamp"])
if patch_time >= cutoff:
patch_count += 1
except:
continue
alert_count = 0
if alert_history_file.exists():
with open(alert_history_file, 'r') as f:
alerts = json.load(f)
for alert in alerts:
try:
alert_time = datetime.fromisoformat(alert["timestamp"])
if alert_time >= cutoff:
alert_count += 1
except:
continue
if alert_count == 0:
return 0.0
return min(patch_count / alert_count, 1.0)
except Exception as e:
logger.warning(f"Failed to compute self-correction rate: {e}")
return 0.0
def _compute_agi_index(self, scores: Dict[str, float]) -> float:
"""
Compute AGI progression index from dimension scores.
Formula:
index = (
(reasoning_depth * 0.25) +
(source_trust_avg * 0.15) +
(skill_coverage * 0.20) +
(prompt_win_rate_avg * 0.20) +
(stability * 0.10) +
(self_correction_rate * 0.10)
)
"""
index = (
(scores.get("reasoning_depth", 0.5) * 0.25) +
(scores.get("source_trust_avg", 0.5) * 0.15) +
(scores.get("skill_coverage", 0.0) * 0.20) +
(scores.get("prompt_win_rate_avg", 0.5) * 0.20) +
(scores.get("stability", 1.0) * 0.10) +
(scores.get("self_correction_rate", 0.0) * 0.10)
)
# Clamp to [0.0, 1.0]
return max(0.0, min(1.0, index))
def _compute_delta(self, scores: Dict[str, float], agi_index: float) -> Dict[str, float]:
"""Compute delta from last snapshot."""
try:
history_file = self.sentinel_dir / "capability_history.json"
if not history_file.exists():
return {k: 0.0 for k in scores.keys()}
with open(history_file, 'r') as f:
history = json.load(f)
if not history:
return {k: 0.0 for k in scores.keys()}
last_snapshot = history[-1]
last_scores = last_snapshot.get("scores", {})
delta = {}
for key, value in scores.items():
last_value = last_scores.get(key, value)
delta[key] = value - last_value
delta["agi_progression_index"] = agi_index - last_snapshot.get("agi_progression_index", agi_index)
return delta
except Exception as e:
logger.warning(f"Failed to compute delta: {e}")
return {k: 0.0 for k in scores.keys()}
def _save_snapshot(self, snapshot: CapabilitySnapshot):
"""Save snapshot to history."""
try:
history_file = self.sentinel_dir / "capability_history.json"
# Load existing history
if history_file.exists():
with open(history_file, 'r') as f:
history = json.load(f)
else:
history = []
# Append new snapshot
history.append(snapshot.to_dict())
# Keep only last 500 entries
if len(history) > 500:
history = history[-500:]
# Save back
with open(history_file, 'w') as f:
json.dump(history, f, indent=2)
except Exception as e:
logger.error(f"Failed to save capability snapshot: {e}")