petter2025 commited on
Commit
84ff88c
·
verified ·
1 Parent(s): 141f2db

Update policy_engine.py

Browse files
Files changed (1) hide show
  1. policy_engine.py +31 -2
policy_engine.py CHANGED
@@ -62,7 +62,15 @@ DEFAULT_HEALING_POLICIES = [
62
 
63
  class PolicyEngine:
64
  """
65
- Thread-safe policy engine with cooldown and rate limiting.
 
 
 
 
 
 
 
 
66
  """
67
 
68
  def __init__(
@@ -71,9 +79,16 @@ class PolicyEngine:
71
  max_cooldown_history: int = 10000,
72
  max_execution_history: int = 1000
73
  ):
 
 
 
 
 
 
74
  self.policies = policies or DEFAULT_HEALING_POLICIES
75
  self._lock = threading.RLock()
76
- self.last_execution: Dict[str, float] = OrderedDict()
 
77
  self.max_cooldown_history = max_cooldown_history
78
  self.execution_timestamps: Dict[str, List[float]] = {}
79
  self.max_execution_history = max_execution_history
@@ -81,6 +96,10 @@ class PolicyEngine:
81
  logger.info(f"Initialized PolicyEngine with {len(self.policies)} policies")
82
 
83
  def evaluate_policies(self, event: ReliabilityEvent) -> List[HealingAction]:
 
 
 
 
84
  applicable_actions = []
85
  current_time = datetime.datetime.now(datetime.timezone.utc).timestamp()
86
  for policy in self.policies:
@@ -95,8 +114,15 @@ class PolicyEngine:
95
  continue
96
  if self._evaluate_conditions(policy.conditions, event):
97
  applicable_actions.extend(policy.actions)
 
98
  self.last_execution[policy_key] = current_time
 
 
 
 
99
  self._record_execution(policy_key, current_time)
 
 
100
  seen = set()
101
  unique = []
102
  for a in applicable_actions:
@@ -106,6 +132,7 @@ class PolicyEngine:
106
  return unique if unique else [HealingAction.NO_ACTION]
107
 
108
  def _evaluate_conditions(self, conditions: List[PolicyCondition], event: ReliabilityEvent) -> bool:
 
109
  for cond in conditions:
110
  metric = cond.metric
111
  op = cond.operator
@@ -133,6 +160,7 @@ class PolicyEngine:
133
  return True
134
 
135
  def _is_rate_limited(self, key: str, policy: HealingPolicy, now: float) -> bool:
 
136
  if key not in self.execution_timestamps:
137
  return False
138
  one_hour_ago = now - 3600
@@ -141,6 +169,7 @@ class PolicyEngine:
141
  return len(recent) >= policy.max_executions_per_hour
142
 
143
  def _record_execution(self, key: str, ts: float):
 
144
  if key not in self.execution_timestamps:
145
  self.execution_timestamps[key] = []
146
  self.execution_timestamps[key].append(ts)
 
62
 
63
  class PolicyEngine:
64
  """
65
+ Threadsafe policy engine with cooldown and rate limiting.
66
+
67
+ Policies are evaluated in priority order. Each policy has:
68
+ - conditions (AND logic)
69
+ - cooldown per (policy, component)
70
+ - rate limit per hour
71
+
72
+ The engine maintains an LRU cache of last execution timestamps
73
+ (using OrderedDict) to bound memory usage.
74
  """
75
 
76
  def __init__(
 
79
  max_cooldown_history: int = 10000,
80
  max_execution_history: int = 1000
81
  ):
82
+ """
83
+ Args:
84
+ policies: List of HealingPolicy objects. If None, DEFAULT_HEALING_POLICIES used.
85
+ max_cooldown_history: Maximum number of (policy, component) entries to keep.
86
+ max_execution_history: Maximum number of timestamps stored per policy for rate limiting.
87
+ """
88
  self.policies = policies or DEFAULT_HEALING_POLICIES
89
  self._lock = threading.RLock()
90
+ # OrderedDict acts as an LRU cache: last item is most recent.
91
+ self.last_execution: OrderedDict[str, float] = OrderedDict()
92
  self.max_cooldown_history = max_cooldown_history
93
  self.execution_timestamps: Dict[str, List[float]] = {}
94
  self.max_execution_history = max_execution_history
 
96
  logger.info(f"Initialized PolicyEngine with {len(self.policies)} policies")
97
 
98
  def evaluate_policies(self, event: ReliabilityEvent) -> List[HealingAction]:
99
+ """
100
+ Evaluate all policies against the event and return the set of actions
101
+ triggered (deduplicated). Returns [NO_ACTION] if none triggered.
102
+ """
103
  applicable_actions = []
104
  current_time = datetime.datetime.now(datetime.timezone.utc).timestamp()
105
  for policy in self.policies:
 
114
  continue
115
  if self._evaluate_conditions(policy.conditions, event):
116
  applicable_actions.extend(policy.actions)
117
+ # Update cooldown
118
  self.last_execution[policy_key] = current_time
119
+ self.last_execution.move_to_end(policy_key) # mark as most recent
120
+ # Enforce cache size
121
+ if len(self.last_execution) > self.max_cooldown_history:
122
+ self.last_execution.popitem(last=False)
123
  self._record_execution(policy_key, current_time)
124
+
125
+ # Deduplicate actions while preserving order
126
  seen = set()
127
  unique = []
128
  for a in applicable_actions:
 
132
  return unique if unique else [HealingAction.NO_ACTION]
133
 
134
  def _evaluate_conditions(self, conditions: List[PolicyCondition], event: ReliabilityEvent) -> bool:
135
+ """Return True if all conditions are satisfied."""
136
  for cond in conditions:
137
  metric = cond.metric
138
  op = cond.operator
 
160
  return True
161
 
162
  def _is_rate_limited(self, key: str, policy: HealingPolicy, now: float) -> bool:
163
+ """Check if the policy has exceeded its hourly execution limit."""
164
  if key not in self.execution_timestamps:
165
  return False
166
  one_hour_ago = now - 3600
 
169
  return len(recent) >= policy.max_executions_per_hour
170
 
171
  def _record_execution(self, key: str, ts: float):
172
+ """Record an execution timestamp for rate limiting."""
173
  if key not in self.execution_timestamps:
174
  self.execution_timestamps[key] = []
175
  self.execution_timestamps[key].append(ts)