petter2025 commited on
Commit
ee7d63b
·
verified ·
1 Parent(s): 88e1d47

Create utils/arf_engine.py

Browse files
Files changed (1) hide show
  1. utils/arf_engine.py +580 -0
utils/arf_engine.py ADDED
@@ -0,0 +1,580 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ ARF 3.3.9 Engine - PhD Level Implementation
3
+ Realistic scoring, psychological framing, enterprise simulation
4
+ """
5
+
6
+ import random
7
+ import time
8
+ from datetime import datetime
9
+ from typing import Dict, List, Tuple
10
+ import numpy as np
11
+
12
+ class BayesianRiskModel:
13
+ """Bayesian risk assessment with priors and confidence intervals"""
14
+
15
+ def __init__(self):
16
+ # Prior distributions for different action types
17
+ self.priors = {
18
+ "destructive": {"alpha": 2, "beta": 8}, # 20% base risk
19
+ "modification": {"alpha": 1, "beta": 9}, # 10% base risk
20
+ "readonly": {"alpha": 1, "beta": 99}, # 1% base risk
21
+ "deployment": {"alpha": 3, "beta": 7}, # 30% base risk
22
+ }
23
+
24
+ # Historical patterns
25
+ self.history = {
26
+ "DROP DATABASE": {"success": 5, "failure": 95},
27
+ "DELETE FROM": {"success": 10, "failure": 90},
28
+ "GRANT": {"success": 30, "failure": 70},
29
+ "UPDATE": {"success": 40, "failure": 60},
30
+ "DEPLOY": {"success": 60, "failure": 40},
31
+ }
32
+
33
+ def assess(self, action: str, context: Dict, historical_patterns: Dict = None) -> Dict:
34
+ """Bayesian risk assessment"""
35
+ # Determine action type
36
+ action_type = self._classify_action(action)
37
+
38
+ # Get prior
39
+ prior = self.priors.get(action_type, self.priors["modification"])
40
+
41
+ # Get likelihood from historical data
42
+ action_key = self._extract_action_key(action)
43
+ historical = historical_patterns.get(action_key, {"success": 50, "failure": 50})
44
+
45
+ # Calculate posterior (simplified)
46
+ alpha_posterior = prior["alpha"] + historical["failure"]
47
+ beta_posterior = prior["beta"] + historical["success"]
48
+
49
+ # Expected risk score
50
+ risk_score = alpha_posterior / (alpha_posterior + beta_posterior)
51
+
52
+ # Add context-based adjustments
53
+ context_adjustment = self._assess_context(context)
54
+ risk_score *= context_adjustment
55
+
56
+ # Add realistic variance (never 0.0 or 1.0)
57
+ risk_score = max(0.25, min(0.95, risk_score + random.uniform(-0.1, 0.1)))
58
+
59
+ # Confidence interval
60
+ n = alpha_posterior + beta_posterior
61
+ confidence = min(0.99, 0.8 + (n / (n + 100)) * 0.19)
62
+
63
+ return {
64
+ "score": risk_score,
65
+ "confidence": confidence,
66
+ "action_type": action_type,
67
+ "risk_factors": self._extract_risk_factors(action, context)
68
+ }
69
+
70
+ def _classify_action(self, action: str) -> str:
71
+ """Classify action type"""
72
+ action_lower = action.lower()
73
+ if any(word in action_lower for word in ["drop", "delete", "truncate", "remove"]):
74
+ return "destructive"
75
+ elif any(word in action_lower for word in ["update", "alter", "modify", "change"]):
76
+ return "modification"
77
+ elif any(word in action_lower for word in ["deploy", "execute", "run", "train"]):
78
+ return "deployment"
79
+ elif any(word in action_lower for word in ["grant", "revoke", "permission"]):
80
+ return "modification"
81
+ else:
82
+ return "readonly"
83
+
84
+ def _extract_action_key(self, action: str) -> str:
85
+ """Extract key action identifier"""
86
+ words = action.split()
87
+ if len(words) > 0:
88
+ return words[0].upper()
89
+ return "UNKNOWN"
90
+
91
+ def _assess_context(self, context: Dict) -> float:
92
+ """Assess context risk multiplier"""
93
+ multiplier = 1.0
94
+ context_str = str(context).lower()
95
+
96
+ # Time-based risk
97
+ if "2am" in context_str or "night" in context_str:
98
+ multiplier *= 1.3
99
+
100
+ # User-based risk
101
+ if "junior" in context_str or "intern" in context_str:
102
+ multiplier *= 1.4
103
+ elif "senior" in context_str or "lead" in context_str:
104
+ multiplier *= 0.8
105
+
106
+ # Environment-based risk
107
+ if "production" in context_str or "prod" in context_str:
108
+ multiplier *= 1.5
109
+ elif "staging" in context_str:
110
+ multiplier *= 1.2
111
+ elif "development" in context_str:
112
+ multiplier *= 0.7
113
+
114
+ # Backup status
115
+ if "backup" in context_str and ("old" in context_str or "no" in context_str):
116
+ multiplier *= 1.4
117
+ elif "backup" in context_str and ("fresh" in context_str or "recent" in context_str):
118
+ multiplier *= 0.9
119
+
120
+ return multiplier
121
+
122
+ def _extract_risk_factors(self, action: str, context: Dict) -> List[str]:
123
+ """Extract specific risk factors"""
124
+ factors = []
125
+ action_lower = action.lower()
126
+ context_str = str(context).lower()
127
+
128
+ if "drop" in action_lower and "database" in action_lower:
129
+ factors.append("Irreversible data destruction")
130
+ factors.append("Potential service outage")
131
+
132
+ if "delete" in action_lower:
133
+ factors.append("Data loss risk")
134
+ if "where" not in action_lower:
135
+ factors.append("No WHERE clause (mass deletion)")
136
+
137
+ if "production" in context_str:
138
+ factors.append("Production environment")
139
+
140
+ if "junior" in context_str:
141
+ factors.append("Junior operator")
142
+
143
+ if "2am" in context_str:
144
+ factors.append("Off-hours operation")
145
+
146
+ return factors[:3] # Return top 3 factors
147
+
148
+ class PolicyEngine:
149
+ """Hierarchical policy evaluation engine"""
150
+
151
+ def __init__(self):
152
+ self.policies = {
153
+ "destructive": {
154
+ "risk_threshold": 0.3,
155
+ "required_approvals": 2,
156
+ "backup_required": True
157
+ },
158
+ "modification": {
159
+ "risk_threshold": 0.5,
160
+ "required_approvals": 1,
161
+ "backup_required": False
162
+ },
163
+ "deployment": {
164
+ "risk_threshold": 0.4,
165
+ "required_approvals": 1,
166
+ "tests_required": True
167
+ },
168
+ "readonly": {
169
+ "risk_threshold": 0.8,
170
+ "required_approvals": 0,
171
+ "backup_required": False
172
+ }
173
+ }
174
+
175
+ def evaluate(self, action: str, risk_profile: Dict, confidence_threshold: float = 0.7) -> Dict:
176
+ """Evaluate action against policies"""
177
+ action_type = risk_profile.get("action_type", "modification")
178
+ risk_score = risk_profile.get("score", 0.5)
179
+
180
+ policy = self.policies.get(action_type, self.policies["modification"])
181
+
182
+ # Policy compliance check
183
+ if risk_score > policy["risk_threshold"]:
184
+ compliance = "HIGH_RISK"
185
+ recommendation = f"Requires {policy['required_approvals']} approval(s)"
186
+ if policy.get("backup_required", False):
187
+ recommendation += " and verified backup"
188
+ else:
189
+ compliance = "WITHIN_POLICY"
190
+ recommendation = "Within policy limits"
191
+
192
+ # Confidence check
193
+ confidence = risk_profile.get("confidence", 0.5)
194
+ if confidence < confidence_threshold:
195
+ compliance = "LOW_CONFIDENCE"
196
+ recommendation = "Low confidence score - manual review recommended"
197
+
198
+ return {
199
+ "compliance": compliance,
200
+ "recommendation": recommendation,
201
+ "policy_type": action_type,
202
+ "risk_threshold": policy["risk_threshold"],
203
+ "actual_risk": risk_score
204
+ }
205
+
206
+ class LicenseManager:
207
+ """Psychology-enhanced license manager"""
208
+
209
+ def __init__(self):
210
+ self.license_patterns = {
211
+ "trial": r"ARF-TRIAL-[A-Z0-9]{8}",
212
+ "starter": r"ARF-STARTER-[A-Z0-9]{8}",
213
+ "professional": r"ARF-PRO-[A-Z0-9]{8}",
214
+ "enterprise": r"ARF-ENTERPRISE-[A-Z0-9]{8}"
215
+ }
216
+
217
+ self.tier_features = {
218
+ "oss": {
219
+ "name": "OSS Edition",
220
+ "color": "#1E88E5",
221
+ "enforcement": "advisory",
222
+ "gates": 0,
223
+ "support": "community"
224
+ },
225
+ "trial": {
226
+ "name": "Trial Edition",
227
+ "color": "#FFB300",
228
+ "enforcement": "mechanical",
229
+ "gates": 3,
230
+ "support": "email",
231
+ "days_remaining": 14
232
+ },
233
+ "starter": {
234
+ "name": "Starter Edition",
235
+ "color": "#FF9800",
236
+ "enforcement": "mechanical",
237
+ "gates": 3,
238
+ "support": "business_hours",
239
+ "price": "$2,000/mo"
240
+ },
241
+ "professional": {
242
+ "name": "Professional Edition",
243
+ "color": "#FF6F00",
244
+ "enforcement": "mechanical",
245
+ "gates": 5,
246
+ "support": "24/7",
247
+ "price": "$5,000/mo"
248
+ },
249
+ "enterprise": {
250
+ "name": "Enterprise Edition",
251
+ "color": "#D84315",
252
+ "enforcement": "mechanical",
253
+ "gates": 7,
254
+ "support": "dedicated",
255
+ "price": "$15,000/mo"
256
+ }
257
+ }
258
+
259
+ def validate(self, license_key: str = None, action_risk: float = 0.5) -> Dict:
260
+ """Validate license and return tier info"""
261
+ if not license_key:
262
+ return self.tier_features["oss"]
263
+
264
+ # Check license patterns
265
+ license_upper = license_key.upper()
266
+
267
+ if "ARF-TRIAL" in license_upper:
268
+ tier = "trial"
269
+ elif "ARF-STARTER" in license_upper:
270
+ tier = "starter"
271
+ elif "ARF-PRO" in license_upper:
272
+ tier = "professional"
273
+ elif "ARF-ENTERPRISE" in license_upper:
274
+ tier = "enterprise"
275
+ else:
276
+ tier = "oss"
277
+
278
+ # Get tier features
279
+ features = self.tier_features.get(tier, self.tier_features["oss"]).copy()
280
+
281
+ # Add psychological elements
282
+ if tier == "trial":
283
+ features["scarcity"] = f"⏳ {features.get('days_remaining', 14)} days remaining"
284
+ features["social_proof"] = "Join 1,000+ developers using ARF"
285
+
286
+ return features
287
+
288
+ class MechanicalGateEvaluator:
289
+ """Mechanical gate evaluation engine"""
290
+
291
+ def __init__(self):
292
+ self.gates = {
293
+ "risk_assessment": {"weight": 0.3, "required": True},
294
+ "policy_compliance": {"weight": 0.3, "required": True},
295
+ "resource_check": {"weight": 0.2, "required": False},
296
+ "approval_workflow": {"weight": 0.1, "required": False},
297
+ "audit_trail": {"weight": 0.1, "required": False}
298
+ }
299
+
300
+ def evaluate(self, risk_profile: Dict, policy_result: Dict, license_info: Dict) -> Dict:
301
+ """Evaluate mechanical gates"""
302
+ gate_results = []
303
+ total_score = 0
304
+ max_score = 0
305
+
306
+ # Gate 1: Risk Assessment
307
+ risk_gate = self._evaluate_risk_gate(risk_profile)
308
+ gate_results.append(risk_gate)
309
+ total_score += risk_gate["score"] * self.gates["risk_assessment"]["weight"]
310
+ max_score += self.gates["risk_assessment"]["weight"]
311
+
312
+ # Gate 2: Policy Compliance
313
+ policy_gate = self._evaluate_policy_gate(policy_result)
314
+ gate_results.append(policy_gate)
315
+ total_score += policy_gate["score"] * self.gates["policy_compliance"]["weight"]
316
+ max_score += self.gates["policy_compliance"]["weight"]
317
+
318
+ # Additional gates based on license tier
319
+ license_tier = license_info.get("name", "OSS Edition").lower()
320
+
321
+ if "trial" in license_tier or "starter" in license_tier:
322
+ # Gate 3: Resource Check
323
+ resource_gate = self._evaluate_resource_gate(risk_profile)
324
+ gate_results.append(resource_gate)
325
+ total_score += resource_gate["score"] * self.gates["resource_check"]["weight"]
326
+ max_score += self.gates["resource_check"]["weight"]
327
+
328
+ if "professional" in license_tier or "enterprise" in license_tier:
329
+ # Gate 4: Approval Workflow
330
+ approval_gate = self._evaluate_approval_gate(policy_result)
331
+ gate_results.append(approval_gate)
332
+ total_score += approval_gate["score"] * self.gates["approval_workflow"]["weight"]
333
+ max_score += self.gates["approval_workflow"]["weight"]
334
+
335
+ # Gate 5: Audit Trail
336
+ audit_gate = self._evaluate_audit_gate()
337
+ gate_results.append(audit_gate)
338
+ total_score += audit_gate["score"] * self.gates["audit_trail"]["weight"]
339
+ max_score += self.gates["audit_trail"]["weight"]
340
+
341
+ # Calculate overall score
342
+ overall_score = total_score / max_score if max_score > 0 else 0
343
+
344
+ # Decision authority
345
+ decision = self._calculate_decision_authority(gate_results, license_tier, overall_score)
346
+
347
+ return {
348
+ "gate_results": gate_results,
349
+ "overall_score": overall_score,
350
+ "decision": decision,
351
+ "gates_passed": len([g for g in gate_results if g["passed"]]),
352
+ "total_gates": len(gate_results)
353
+ }
354
+
355
+ def _evaluate_risk_gate(self, risk_profile: Dict) -> Dict:
356
+ """Evaluate risk assessment gate"""
357
+ risk_score = risk_profile.get("score", 0.5)
358
+ confidence = risk_profile.get("confidence", 0.5)
359
+
360
+ passed = risk_score < 0.7 and confidence > 0.6
361
+ score = (0.7 - min(risk_score, 0.7)) / 0.7 * 0.5 + (confidence - 0.6) / 0.4 * 0.5
362
+
363
+ return {
364
+ "name": "Risk Assessment",
365
+ "passed": passed,
366
+ "score": max(0, min(1, score)),
367
+ "details": f"Risk: {risk_score:.1%}, Confidence: {confidence:.1%}"
368
+ }
369
+
370
+ def _evaluate_policy_gate(self, policy_result: Dict) -> Dict:
371
+ """Evaluate policy compliance gate"""
372
+ compliance = policy_result.get("compliance", "HIGH_RISK")
373
+ risk_threshold = policy_result.get("risk_threshold", 0.5)
374
+ actual_risk = policy_result.get("actual_risk", 0.5)
375
+
376
+ passed = compliance != "HIGH_RISK"
377
+ score = 1.0 if passed else (risk_threshold / actual_risk if actual_risk > 0 else 0)
378
+
379
+ return {
380
+ "name": "Policy Compliance",
381
+ "passed": passed,
382
+ "score": max(0, min(1, score)),
383
+ "details": f"Compliance: {compliance}"
384
+ }
385
+
386
+ def _evaluate_resource_gate(self, risk_profile: Dict) -> Dict:
387
+ """Evaluate resource check gate"""
388
+ # Simulate resource availability check
389
+ passed = random.random() > 0.3 # 70% chance of passing
390
+ score = 0.8 if passed else 0.3
391
+
392
+ return {
393
+ "name": "Resource Check",
394
+ "passed": passed,
395
+ "score": score,
396
+ "details": "Resources available" if passed else "Resource constraints detected"
397
+ }
398
+
399
+ def _evaluate_approval_gate(self, policy_result: Dict) -> Dict:
400
+ """Evaluate approval workflow gate"""
401
+ # Simulate approval workflow
402
+ passed = random.random() > 0.2 # 80% chance of passing
403
+ score = 0.9 if passed else 0.2
404
+
405
+ return {
406
+ "name": "Approval Workflow",
407
+ "passed": passed,
408
+ "score": score,
409
+ "details": "Approvals verified" if passed else "Pending approvals"
410
+ }
411
+
412
+ def _evaluate_audit_gate(self) -> Dict:
413
+ """Evaluate audit trail gate"""
414
+ # Always passes for demo
415
+ return {
416
+ "name": "Audit Trail",
417
+ "passed": True,
418
+ "score": 1.0,
419
+ "details": "Audit trail generated"
420
+ }
421
+
422
+ def _calculate_decision_authority(self, gate_results: List[Dict], license_tier: str, overall_score: float) -> str:
423
+ """Calculate decision authority"""
424
+ required_gates = [g for g in gate_results if self.gates.get(g["name"].lower().replace(" ", "_"), {}).get("required", False)]
425
+ passed_required = all(g["passed"] for g in required_gates)
426
+
427
+ if not passed_required:
428
+ return "BLOCKED"
429
+
430
+ # Decision thresholds based on license tier
431
+ thresholds = {
432
+ "oss": 1.0, # Never autonomous
433
+ "trial": 0.9,
434
+ "starter": 0.85,
435
+ "professional": 0.8,
436
+ "enterprise": 0.75
437
+ }
438
+
439
+ tier_key = "oss"
440
+ for key in ["trial", "starter", "professional", "enterprise"]:
441
+ if key in license_tier:
442
+ tier_key = key
443
+ break
444
+
445
+ threshold = thresholds.get(tier_key, 1.0)
446
+
447
+ if overall_score >= threshold:
448
+ return "AUTONOMOUS"
449
+ else:
450
+ return "HUMAN_APPROVAL"
451
+
452
+ class ARFEngine:
453
+ """Enterprise-grade reliability engine with psychological optimization"""
454
+
455
+ def __init__(self):
456
+ self.risk_model = BayesianRiskModel()
457
+ self.policy_engine = PolicyEngine()
458
+ self.license_manager = LicenseManager()
459
+ self.gate_evaluator = MechanicalGateEvaluator()
460
+ self.stats = {
461
+ "actions_tested": 0,
462
+ "risks_prevented": 0,
463
+ "time_saved_minutes": 0,
464
+ "trial_requests": 0,
465
+ "start_time": time.time()
466
+ }
467
+ self.history = []
468
+
469
+ def assess_action(self, action: str, context: Dict, license_key: str = None) -> Dict:
470
+ """Comprehensive action assessment with psychological framing"""
471
+ start_time = time.time()
472
+
473
+ # 1. Multi-dimensional risk assessment
474
+ risk_profile = self.risk_model.assess(
475
+ action=action,
476
+ context=context,
477
+ historical_patterns=self.risk_model.history
478
+ )
479
+
480
+ # 2. Policy evaluation with confidence intervals
481
+ policy_result = self.policy_engine.evaluate(
482
+ action=action,
483
+ risk_profile=risk_profile,
484
+ confidence_threshold=0.7
485
+ )
486
+
487
+ # 3. License validation with tier-specific gates
488
+ license_info = self.license_manager.validate(
489
+ license_key,
490
+ action_risk=risk_profile["score"]
491
+ )
492
+
493
+ # 4. Mechanical gate evaluation
494
+ gate_results = self.gate_evaluator.evaluate(
495
+ risk_profile=risk_profile,
496
+ policy_result=policy_result,
497
+ license_info=license_info
498
+ )
499
+
500
+ # 5. Generate recommendation
501
+ recommendation = self._generate_recommendation(
502
+ risk_profile, policy_result, license_info, gate_results
503
+ )
504
+
505
+ # 6. Calculate processing time
506
+ processing_time = (time.time() - start_time) * 1000 # ms
507
+
508
+ # Update statistics
509
+ if risk_profile["score"] > 0.5:
510
+ self.stats["risks_prevented"] += 1
511
+
512
+ # Store in history
513
+ self.history.append({
514
+ "action": action,
515
+ "risk_score": risk_profile["score"],
516
+ "timestamp": datetime.now().isoformat(),
517
+ "license_tier": license_info.get("name", "OSS")
518
+ })
519
+
520
+ # Keep only last 100 entries
521
+ if len(self.history) > 100:
522
+ self.history = self.history[-100:]
523
+
524
+ return {
525
+ "risk_score": risk_profile["score"],
526
+ "risk_factors": risk_profile["risk_factors"],
527
+ "confidence": risk_profile["confidence"],
528
+ "recommendation": recommendation,
529
+ "policy_compliance": policy_result["compliance"],
530
+ "license_tier": license_info["name"],
531
+ "gate_decision": gate_results["decision"],
532
+ "gates_passed": gate_results["gates_passed"],
533
+ "total_gates": gate_results["total_gates"],
534
+ "processing_time_ms": processing_time,
535
+ "stats": self.get_stats()
536
+ }
537
+
538
+ def _generate_recommendation(self, risk_profile: Dict, policy_result: Dict,
539
+ license_info: Dict, gate_results: Dict) -> str:
540
+ """Generate psychological recommendation"""
541
+ risk_score = risk_profile["score"]
542
+ decision = gate_results["decision"]
543
+ tier = license_info["name"]
544
+
545
+ if tier == "OSS Edition":
546
+ if risk_score > 0.7:
547
+ return "🚨 HIGH RISK: This action would be BLOCKED by mechanical gates. Consider Enterprise for protection."
548
+ elif risk_score > 0.4:
549
+ return "⚠️ MODERATE RISK: Requires manual review. Mechanical gates would automate this check."
550
+ else:
551
+ return "✅ LOW RISK: Action appears safe. Mechanical gates provide additional verification."
552
+
553
+ else:
554
+ if decision == "BLOCKED":
555
+ return "❌ BLOCKED: Action prevented by mechanical gates. Risk factors: " + ", ".join(risk_profile["risk_factors"][:2])
556
+ elif decision == "HUMAN_APPROVAL":
557
+ return "🔄 REQUIRES APPROVAL: Action meets risk threshold. Routing to human approver."
558
+ else: # AUTONOMOUS
559
+ return "✅ APPROVED: Action passes all mechanical gates and is proceeding autonomously."
560
+
561
+ def update_stats(self, stat_type: str, value: int = 1):
562
+ """Update statistics"""
563
+ if stat_type in self.stats:
564
+ self.stats[stat_type] += value
565
+
566
+ # Update time saved (15 minutes per action)
567
+ if stat_type == "actions_tested":
568
+ self.stats["time_saved_minutes"] += 15
569
+
570
+ def get_stats(self) -> Dict:
571
+ """Get current statistics"""
572
+ elapsed_hours = (time.time() - self.stats["start_time"]) / 3600
573
+ actions_per_hour = self.stats["actions_tested"] / max(elapsed_hours, 0.1)
574
+
575
+ return {
576
+ **self.stats,
577
+ "actions_per_hour": round(actions_per_hour, 1),
578
+ "reliability_score": min(99.9, 95 + (self.stats["risks_prevented"] / max(self.stats["actions_tested"], 1)) * 5),
579
+ "history_size": len(self.history)
580
+ }