| import logging |
| import numpy as np |
| from typing import Dict, Any, List |
| from agentic_reliability_framework.runtime.agents.base import BaseAgent, AgentSpecialization |
| from ai_event import AIEvent |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class MemoryDriftDiagnosticianAgent(BaseAgent): |
| """ |
| Detects drift in semantic memory by comparing current retrieval scores |
| with their historical distribution using a z‑score test. |
| """ |
|
|
| def __init__(self, history_window: int = 100, zscore_threshold: float = 2.0): |
| super().__init__(AgentSpecialization.DIAGNOSTICIAN) |
| self.history_window = history_window |
| self.zscore_threshold = zscore_threshold |
| self._retrieval_scores_history: List[float] = [] |
|
|
| async def analyze(self, event: AIEvent) -> Dict[str, Any]: |
| try: |
| if not event.retrieval_scores: |
| return { |
| 'specialization': 'ai_memory_drift', |
| 'confidence': 0.0, |
| 'findings': {}, |
| 'recommendations': [] |
| } |
|
|
| current_avg = float(np.mean(event.retrieval_scores)) |
| self._retrieval_scores_history.append(current_avg) |
|
|
| if len(self._retrieval_scores_history) > self.history_window: |
| self._retrieval_scores_history.pop(0) |
|
|
| if len(self._retrieval_scores_history) < 10: |
| return { |
| 'specialization': 'ai_memory_drift', |
| 'confidence': 0.0, |
| 'findings': { |
| 'drift_detected': False, |
| 'current_avg': current_avg, |
| 'historical_avg': None, |
| 'z_score': None |
| }, |
| 'recommendations': [] |
| } |
|
|
| historical_avg = float(np.mean(self._retrieval_scores_history[:-1])) |
| historical_std = float(np.std(self._retrieval_scores_history[:-1])) + 1e-6 |
| z_score = (current_avg - historical_avg) / historical_std |
| drift_detected = abs(z_score) > self.zscore_threshold |
| confidence = min(1.0, abs(z_score) / 5.0) |
|
|
| return { |
| 'specialization': 'ai_memory_drift', |
| 'confidence': confidence, |
| 'findings': { |
| 'drift_detected': drift_detected, |
| 'current_avg': current_avg, |
| 'historical_avg': historical_avg, |
| 'z_score': float(z_score) |
| }, |
| 'recommendations': [ |
| "Reindex knowledge base", |
| "Adjust embedding model", |
| "Update context window" |
| ] if drift_detected else [] |
| } |
| except Exception as e: |
| logger.error(f"MemoryDriftDiagnostician error: {e}", exc_info=True) |
| return { |
| 'specialization': 'ai_memory_drift', |
| 'confidence': 0.0, |
| 'findings': {}, |
| 'recommendations': [] |
| } |