petter2025 commited on
Commit
85f35f4
·
verified ·
1 Parent(s): dbc4712

Delete policy_engine.py

Browse files
Files changed (1) hide show
  1. policy_engine.py +0 -204
policy_engine.py DELETED
@@ -1,204 +0,0 @@
1
- """
2
- Policy Engine for Automated Healing Actions.
3
- """
4
-
5
- import threading
6
- import logging
7
- import datetime
8
- from collections import OrderedDict
9
- from typing import Dict, List, Optional, Any
10
-
11
- from agentic_reliability_framework.core.models.event import HealingPolicy, HealingAction, PolicyCondition, ReliabilityEvent
12
-
13
- logger = logging.getLogger(__name__)
14
-
15
-
16
- DEFAULT_HEALING_POLICIES = [
17
- HealingPolicy(
18
- name="high_latency_restart",
19
- conditions=[PolicyCondition(metric="latency_p99", operator="gt", threshold=500.0)],
20
- actions=[HealingAction.RESTART_CONTAINER, HealingAction.ALERT_TEAM],
21
- priority=1,
22
- cool_down_seconds=300,
23
- max_executions_per_hour=5
24
- ),
25
- HealingPolicy(
26
- name="critical_error_rate_rollback",
27
- conditions=[PolicyCondition(metric="error_rate", operator="gt", threshold=0.3)],
28
- actions=[HealingAction.ROLLBACK, HealingAction.CIRCUIT_BREAKER, HealingAction.ALERT_TEAM],
29
- priority=1,
30
- cool_down_seconds=600,
31
- max_executions_per_hour=3
32
- ),
33
- HealingPolicy(
34
- name="high_error_rate_traffic_shift",
35
- conditions=[PolicyCondition(metric="error_rate", operator="gt", threshold=0.15)],
36
- actions=[HealingAction.TRAFFIC_SHIFT, HealingAction.ALERT_TEAM],
37
- priority=2,
38
- cool_down_seconds=300,
39
- max_executions_per_hour=5
40
- ),
41
- HealingPolicy(
42
- name="resource_exhaustion_scale",
43
- conditions=[
44
- PolicyCondition(metric="cpu_util", operator="gt", threshold=0.9),
45
- PolicyCondition(metric="memory_util", operator="gt", threshold=0.9)
46
- ],
47
- actions=[HealingAction.SCALE_OUT],
48
- priority=2,
49
- cool_down_seconds=600,
50
- max_executions_per_hour=10
51
- ),
52
- HealingPolicy(
53
- name="moderate_latency_circuit_breaker",
54
- conditions=[PolicyCondition(metric="latency_p99", operator="gt", threshold=300.0)],
55
- actions=[HealingAction.CIRCUIT_BREAKER],
56
- priority=3,
57
- cool_down_seconds=180,
58
- max_executions_per_hour=8
59
- )
60
- ]
61
-
62
-
63
- class PolicyEngine:
64
- """
65
- Thread‑safe policy engine with cooldown and rate limiting.
66
- Policies are evaluated in priority order. Each policy has:
67
- - conditions (AND logic)
68
- - cooldown per (policy, component)
69
- - rate limit per hour
70
- The engine maintains an LRU cache of last execution timestamps
71
- (using OrderedDict) to bound memory usage.
72
- """
73
-
74
- def __init__(
75
- self,
76
- policies: Optional[List[HealingPolicy]] = None,
77
- max_cooldown_history: int = 10000,
78
- max_execution_history: int = 1000
79
- ):
80
- """
81
- Args:
82
- policies: List of HealingPolicy objects. If None, DEFAULT_HEALING_POLICIES used.
83
- max_cooldown_history: Maximum number of (policy, component) entries to keep.
84
- max_execution_history: Maximum number of timestamps stored per policy for rate limiting.
85
- """
86
- self.policies = policies or DEFAULT_HEALING_POLICIES
87
- self._lock = threading.RLock()
88
- # OrderedDict acts as an LRU cache: last item is most recent.
89
- self.last_execution: OrderedDict[str, float] = OrderedDict()
90
- self.max_cooldown_history = max_cooldown_history
91
- self.execution_timestamps: Dict[str, List[float]] = {}
92
- self.max_execution_history = max_execution_history
93
- self.policies = sorted(self.policies, key=lambda p: p.priority)
94
- logger.info(f"Initialized PolicyEngine with {len(self.policies)} policies")
95
-
96
- def evaluate_policies(self, event: ReliabilityEvent) -> List[HealingAction]:
97
- """
98
- Evaluate all policies against the event and return the set of actions
99
- triggered (deduplicated). Returns [NO_ACTION] if none triggered.
100
- """
101
- applicable_actions = []
102
- current_time = datetime.datetime.now(datetime.timezone.utc).timestamp()
103
- for policy in self.policies:
104
- if not policy.enabled:
105
- continue
106
- policy_key = f"{policy.name}_{event.component}"
107
- with self._lock:
108
- last_exec = self.last_execution.get(policy_key, 0)
109
- if current_time - last_exec < policy.cool_down_seconds:
110
- continue
111
- if self._is_rate_limited(policy_key, policy, current_time):
112
- continue
113
- if self._evaluate_conditions(policy.conditions, event):
114
- applicable_actions.extend(policy.actions)
115
- # Update cooldown
116
- self.last_execution[policy_key] = current_time
117
- self.last_execution.move_to_end(policy_key) # mark as most recent
118
- # Enforce cache size
119
- if len(self.last_execution) > self.max_cooldown_history:
120
- self.last_execution.popitem(last=False)
121
- self._record_execution(policy_key, current_time)
122
-
123
- # Deduplicate actions while preserving order
124
- seen = set()
125
- unique = []
126
- for a in applicable_actions:
127
- if a not in seen:
128
- seen.add(a)
129
- unique.append(a)
130
- return unique if unique else [HealingAction.NO_ACTION]
131
-
132
- def _evaluate_conditions(self, conditions: List[PolicyCondition], event: ReliabilityEvent) -> bool:
133
- """Return True if all conditions are satisfied."""
134
- for cond in conditions:
135
- metric = cond.metric
136
- op = cond.operator
137
- thresh = cond.threshold
138
- val = getattr(event, metric, None)
139
- if val is None:
140
- return False
141
- if op == "gt":
142
- if not (val > thresh):
143
- return False
144
- elif op == "lt":
145
- if not (val < thresh):
146
- return False
147
- elif op == "eq":
148
- if not (abs(val - thresh) < 1e-6):
149
- return False
150
- elif op == "gte":
151
- if not (val >= thresh):
152
- return False
153
- elif op == "lte":
154
- if not (val <= thresh):
155
- return False
156
- else:
157
- return False
158
- return True
159
-
160
- def _is_rate_limited(self, key: str, policy: HealingPolicy, now: float) -> bool:
161
- """Check if the policy has exceeded its hourly execution limit."""
162
- if key not in self.execution_timestamps:
163
- return False
164
- one_hour_ago = now - 3600
165
- recent = [ts for ts in self.execution_timestamps[key] if ts > one_hour_ago]
166
- self.execution_timestamps[key] = recent
167
- return len(recent) >= policy.max_executions_per_hour
168
-
169
- def _record_execution(self, key: str, ts: float):
170
- """Record an execution timestamp for rate limiting."""
171
- if key not in self.execution_timestamps:
172
- self.execution_timestamps[key] = []
173
- self.execution_timestamps[key].append(ts)
174
- if len(self.execution_timestamps[key]) > self.max_execution_history:
175
- self.execution_timestamps[key] = self.execution_timestamps[key][-self.max_execution_history:]
176
-
177
- # ========== NEW: Simplified evaluation for demo ==========
178
- def evaluate(self, event_type: str, severity: str, component: str) -> List[Dict[str, Any]]:
179
- """
180
- Simplified policy evaluation for the governance demo.
181
- Returns a list of recommended actions based on severity and event type.
182
- Each action is a dict with keys: policy, action, reason.
183
- """
184
- actions = []
185
- if severity == "critical":
186
- actions.append({
187
- "policy": "POL-002",
188
- "action": "isolate_affected",
189
- "reason": "Critical failure detected"
190
- })
191
- elif severity == "high":
192
- actions.append({
193
- "policy": "POL-004",
194
- "action": "require_approval",
195
- "reason": "High risk"
196
- })
197
- elif severity == "medium" and event_type == "text_generation":
198
- actions.append({
199
- "policy": "POL-001",
200
- "action": "regenerate",
201
- "reason": "Low confidence"
202
- })
203
- # You can add more rules based on component or event_type as needed
204
- return actions