Agentic-Reliability-Framework-v4 / memory_drift_diagnostician.py
petter2025's picture
Update memory_drift_diagnostician.py
162cb4f verified
raw
history blame
4.1 kB
import logging
import numpy as np
from typing import Dict, Any, List, Optional
from agentic_reliability_framework.runtime.agents.base import BaseAgent, AgentSpecialization
from ai_event import AIEvent
logger = logging.getLogger(__name__)
class MemoryDriftDiagnosticianAgent(BaseAgent):
"""
Detects drift in semantic memory by comparing current retrieval scores
with their historical distribution using a z‑score test.
"""
def __init__(self, history_window: int = 100, zscore_threshold: float = 2.0):
"""
Args:
history_window: Number of recent scores to keep for baseline statistics.
zscore_threshold: Absolute z‑score above which drift is flagged.
"""
super().__init__(AgentSpecialization.DIAGNOSTICIAN)
self.history_window = history_window
self.zscore_threshold = zscore_threshold
self._retrieval_scores_history: List[float] = []
async def analyze(self, event: AIEvent) -> Dict[str, Any]:
"""
Analyze retrieval scores for drift.
Args:
event: AIEvent containing `retrieval_scores` (list of floats).
Returns:
Dictionary with keys:
- specialization: str
- confidence: float (0‑1) based on z‑score magnitude
- findings: dict with drift detection and statistics
- recommendations: list of strings if drift detected
"""
try:
# If no retrieval scores, cannot compute drift
if not event.retrieval_scores:
return {
'specialization': 'ai_memory_drift',
'confidence': 0.0,
'findings': {},
'recommendations': []
}
# Current average score
current_avg = float(np.mean(event.retrieval_scores))
self._retrieval_scores_history.append(current_avg)
# Trim history to window size
if len(self._retrieval_scores_history) > self.history_window:
self._retrieval_scores_history.pop(0)
# Need at least 10 points for a reliable baseline
if len(self._retrieval_scores_history) < 10:
return {
'specialization': 'ai_memory_drift',
'confidence': 0.0,
'findings': {
'drift_detected': False,
'current_avg': current_avg,
'historical_avg': None,
'z_score': None
},
'recommendations': []
}
# Historical baseline (excluding current point)
historical_avg = float(np.mean(self._retrieval_scores_history[:-1]))
historical_std = float(np.std(self._retrieval_scores_history[:-1])) + 1e-6 # avoid division by zero
z_score = (current_avg - historical_avg) / historical_std
drift_detected = abs(z_score) > self.zscore_threshold
# Confidence derived from z‑score magnitude (capped at 1.0)
confidence = min(1.0, abs(z_score) / 5.0)
return {
'specialization': 'ai_memory_drift',
'confidence': confidence,
'findings': {
'drift_detected': drift_detected,
'current_avg': current_avg,
'historical_avg': historical_avg,
'z_score': float(z_score)
},
'recommendations': [
"Reindex knowledge base",
"Adjust embedding model",
"Update context window"
] if drift_detected else []
}
except Exception as e:
logger.error(f"MemoryDriftDiagnostician error: {e}", exc_info=True)
return {
'specialization': 'ai_memory_drift',
'confidence': 0.0,
'findings': {},
'recommendations': []
}