Agentic-Reliability-Framework-v4 / memory_drift_diagnostician.py
petter2025's picture
Update memory_drift_diagnostician.py
c2e178b verified
raw
history blame
3.07 kB
import logging
import numpy as np
from typing import Dict, Any, List
from agentic_reliability_framework.runtime.agents.base import BaseAgent, AgentSpecialization
from ai_event import AIEvent
logger = logging.getLogger(__name__)
class MemoryDriftDiagnosticianAgent(BaseAgent):
"""
Detects drift in semantic memory by comparing current retrieval scores
with their historical distribution using a z‑score test.
"""
def __init__(self, history_window: int = 100, zscore_threshold: float = 2.0):
super().__init__(AgentSpecialization.DIAGNOSTICIAN)
self.history_window = history_window
self.zscore_threshold = zscore_threshold
self._retrieval_scores_history: List[float] = []
async def analyze(self, event: AIEvent) -> Dict[str, Any]:
try:
if not event.retrieval_scores:
return {
'specialization': 'ai_memory_drift',
'confidence': 0.0,
'findings': {},
'recommendations': []
}
current_avg = float(np.mean(event.retrieval_scores))
self._retrieval_scores_history.append(current_avg)
if len(self._retrieval_scores_history) > self.history_window:
self._retrieval_scores_history.pop(0)
if len(self._retrieval_scores_history) < 10:
return {
'specialization': 'ai_memory_drift',
'confidence': 0.0,
'findings': {
'drift_detected': False,
'current_avg': current_avg,
'historical_avg': None,
'z_score': None
},
'recommendations': []
}
historical_avg = float(np.mean(self._retrieval_scores_history[:-1]))
historical_std = float(np.std(self._retrieval_scores_history[:-1])) + 1e-6
z_score = (current_avg - historical_avg) / historical_std
drift_detected = abs(z_score) > self.zscore_threshold
confidence = min(1.0, abs(z_score) / 5.0)
return {
'specialization': 'ai_memory_drift',
'confidence': confidence,
'findings': {
'drift_detected': drift_detected,
'current_avg': current_avg,
'historical_avg': historical_avg,
'z_score': float(z_score)
},
'recommendations': [
"Reindex knowledge base",
"Adjust embedding model",
"Update context window"
] if drift_detected else []
}
except Exception as e:
logger.error(f"MemoryDriftDiagnostician error: {e}", exc_info=True)
return {
'specialization': 'ai_memory_drift',
'confidence': 0.0,
'findings': {},
'recommendations': []
}