petter2025 commited on
Commit
162cb4f
·
verified ·
1 Parent(s): 47c50e9

Update memory_drift_diagnostician.py

Browse files
Files changed (1) hide show
  1. memory_drift_diagnostician.py +68 -15
memory_drift_diagnostician.py CHANGED
@@ -1,36 +1,83 @@
1
  import logging
2
  import numpy as np
3
- from typing import Dict, Any, List
4
  from agentic_reliability_framework.runtime.agents.base import BaseAgent, AgentSpecialization
5
  from ai_event import AIEvent
6
 
7
  logger = logging.getLogger(__name__)
8
 
 
9
  class MemoryDriftDiagnosticianAgent(BaseAgent):
10
- """Detects drift in semantic memory by comparing current retrieval scores with historical distribution."""
11
- def __init__(self, history_window: int = 100):
 
 
 
 
 
 
 
 
 
12
  super().__init__(AgentSpecialization.DIAGNOSTICIAN)
13
  self.history_window = history_window
 
14
  self._retrieval_scores_history: List[float] = []
15
 
16
  async def analyze(self, event: AIEvent) -> Dict[str, Any]:
 
 
 
 
 
 
 
 
 
 
 
 
 
17
  try:
 
18
  if not event.retrieval_scores:
19
- return {'specialization': 'ai_memory_drift', 'confidence': 0, 'findings': {}, 'recommendations': []}
 
 
 
 
 
 
 
20
  current_avg = float(np.mean(event.retrieval_scores))
21
  self._retrieval_scores_history.append(current_avg)
 
 
22
  if len(self._retrieval_scores_history) > self.history_window:
23
  self._retrieval_scores_history.pop(0)
24
 
 
25
  if len(self._retrieval_scores_history) < 10:
26
- drift_detected = False
27
- confidence = 0.0
28
- else:
29
- historical_avg = float(np.mean(self._retrieval_scores_history[:-1]))
30
- historical_std = float(np.std(self._retrieval_scores_history[:-1])) + 1e-6
31
- z_score = (current_avg - historical_avg) / historical_std
32
- drift_detected = abs(z_score) > 2.0
33
- confidence = min(1.0, abs(z_score) / 5.0)
 
 
 
 
 
 
 
 
 
 
 
 
34
 
35
  return {
36
  'specialization': 'ai_memory_drift',
@@ -38,8 +85,8 @@ class MemoryDriftDiagnosticianAgent(BaseAgent):
38
  'findings': {
39
  'drift_detected': drift_detected,
40
  'current_avg': current_avg,
41
- 'historical_avg': historical_avg if len(self._retrieval_scores_history) > 1 else None,
42
- 'z_score': float(z_score) if len(self._retrieval_scores_history) > 1 else None
43
  },
44
  'recommendations': [
45
  "Reindex knowledge base",
@@ -47,6 +94,12 @@ class MemoryDriftDiagnosticianAgent(BaseAgent):
47
  "Update context window"
48
  ] if drift_detected else []
49
  }
 
50
  except Exception as e:
51
  logger.error(f"MemoryDriftDiagnostician error: {e}", exc_info=True)
52
- return {'specialization': 'ai_memory_drift', 'confidence': 0.0, 'findings': {}, 'recommendations': []}
 
 
 
 
 
 
1
  import logging
2
  import numpy as np
3
+ from typing import Dict, Any, List, Optional
4
  from agentic_reliability_framework.runtime.agents.base import BaseAgent, AgentSpecialization
5
  from ai_event import AIEvent
6
 
7
  logger = logging.getLogger(__name__)
8
 
9
+
10
  class MemoryDriftDiagnosticianAgent(BaseAgent):
11
+ """
12
+ Detects drift in semantic memory by comparing current retrieval scores
13
+ with their historical distribution using a z‑score test.
14
+ """
15
+
16
+ def __init__(self, history_window: int = 100, zscore_threshold: float = 2.0):
17
+ """
18
+ Args:
19
+ history_window: Number of recent scores to keep for baseline statistics.
20
+ zscore_threshold: Absolute z‑score above which drift is flagged.
21
+ """
22
  super().__init__(AgentSpecialization.DIAGNOSTICIAN)
23
  self.history_window = history_window
24
+ self.zscore_threshold = zscore_threshold
25
  self._retrieval_scores_history: List[float] = []
26
 
27
  async def analyze(self, event: AIEvent) -> Dict[str, Any]:
28
+ """
29
+ Analyze retrieval scores for drift.
30
+
31
+ Args:
32
+ event: AIEvent containing `retrieval_scores` (list of floats).
33
+
34
+ Returns:
35
+ Dictionary with keys:
36
+ - specialization: str
37
+ - confidence: float (0‑1) based on z‑score magnitude
38
+ - findings: dict with drift detection and statistics
39
+ - recommendations: list of strings if drift detected
40
+ """
41
  try:
42
+ # If no retrieval scores, cannot compute drift
43
  if not event.retrieval_scores:
44
+ return {
45
+ 'specialization': 'ai_memory_drift',
46
+ 'confidence': 0.0,
47
+ 'findings': {},
48
+ 'recommendations': []
49
+ }
50
+
51
+ # Current average score
52
  current_avg = float(np.mean(event.retrieval_scores))
53
  self._retrieval_scores_history.append(current_avg)
54
+
55
+ # Trim history to window size
56
  if len(self._retrieval_scores_history) > self.history_window:
57
  self._retrieval_scores_history.pop(0)
58
 
59
+ # Need at least 10 points for a reliable baseline
60
  if len(self._retrieval_scores_history) < 10:
61
+ return {
62
+ 'specialization': 'ai_memory_drift',
63
+ 'confidence': 0.0,
64
+ 'findings': {
65
+ 'drift_detected': False,
66
+ 'current_avg': current_avg,
67
+ 'historical_avg': None,
68
+ 'z_score': None
69
+ },
70
+ 'recommendations': []
71
+ }
72
+
73
+ # Historical baseline (excluding current point)
74
+ historical_avg = float(np.mean(self._retrieval_scores_history[:-1]))
75
+ historical_std = float(np.std(self._retrieval_scores_history[:-1])) + 1e-6 # avoid division by zero
76
+ z_score = (current_avg - historical_avg) / historical_std
77
+ drift_detected = abs(z_score) > self.zscore_threshold
78
+
79
+ # Confidence derived from z‑score magnitude (capped at 1.0)
80
+ confidence = min(1.0, abs(z_score) / 5.0)
81
 
82
  return {
83
  'specialization': 'ai_memory_drift',
 
85
  'findings': {
86
  'drift_detected': drift_detected,
87
  'current_avg': current_avg,
88
+ 'historical_avg': historical_avg,
89
+ 'z_score': float(z_score)
90
  },
91
  'recommendations': [
92
  "Reindex knowledge base",
 
94
  "Update context window"
95
  ] if drift_detected else []
96
  }
97
+
98
  except Exception as e:
99
  logger.error(f"MemoryDriftDiagnostician error: {e}", exc_info=True)
100
+ return {
101
+ 'specialization': 'ai_memory_drift',
102
+ 'confidence': 0.0,
103
+ 'findings': {},
104
+ 'recommendations': []
105
+ }