petter2025's picture
Update demo/mock_arf.py
beb9e94 verified
raw
history blame
29.6 kB
# demo/mock_arf.py
"""
Enhanced Mock ARF with scenario-aware metrics
Generates different values based on scenario characteristics
DOCTRINAL COMPLIANCE VERSION 3.3.9+restraint
Key Addition: Explicit Observation Gate for psychological advantage
"""
import random
import time
import datetime
from typing import Dict, Any, List
import json
# Scenario-specific configurations
SCENARIO_CONFIGS = {
"Cache Miss Storm": {
"detection_confidence_range": (0.97, 0.995), # 97-99.5%
"detection_time_range": (35, 55), # 35-55 seconds
"accuracy_range": (0.97, 0.995), # 97-99.5%
"similar_incidents_range": (2, 5), # 2-5 similar incidents
"similarity_score_range": (0.88, 0.96), # 88-96% similarity
"pattern_confidence_range": (0.91, 0.97), # 91-97% confidence
"success_rate_range": (0.82, 0.93), # 82-93% success rate
"cost_savings_range": (5000, 9000), # $5K-$9K savings
"resolution_time_range": (10, 18), # 10-18 minutes
"affected_users_range": (30000, 60000), # 30K-60K users
"tags": ["cache", "redis", "latency", "memory"]
},
"Database Connection Pool Exhaustion": {
"detection_confidence_range": (0.92, 0.98),
"detection_time_range": (40, 65),
"accuracy_range": (0.95, 0.985),
"similar_incidents_range": (1, 4),
"similarity_score_range": (0.85, 0.94),
"pattern_confidence_range": (0.88, 0.95),
"success_rate_range": (0.78, 0.88),
"cost_savings_range": (3500, 5500),
"resolution_time_range": (15, 25),
"affected_users_range": (15000, 30000),
"tags": ["database", "postgres", "connections", "pool"]
},
"Kubernetes Memory Leak": {
"detection_confidence_range": (0.94, 0.99),
"detection_time_range": (30, 50),
"accuracy_range": (0.96, 0.99),
"similar_incidents_range": (3, 6),
"similarity_score_range": (0.89, 0.95),
"pattern_confidence_range": (0.90, 0.96),
"success_rate_range": (0.85, 0.92),
"cost_savings_range": (4500, 7500),
"resolution_time_range": (12, 22),
"affected_users_range": (20000, 40000),
"tags": ["kubernetes", "memory", "container", "leak"]
},
"API Rate Limit Storm": {
"detection_confidence_range": (0.96, 0.99),
"detection_time_range": (25, 45),
"accuracy_range": (0.97, 0.99),
"similar_incidents_range": (2, 4),
"similarity_score_range": (0.87, 0.93),
"pattern_confidence_range": (0.89, 0.94),
"success_rate_range": (0.80, 0.90),
"cost_savings_range": (3000, 5000),
"resolution_time_range": (8, 15),
"affected_users_range": (10000, 25000),
"tags": ["api", "rate_limit", "throttling", "ddos"]
},
"Network Partition": {
"detection_confidence_range": (0.98, 0.999),
"detection_time_range": (20, 40),
"accuracy_range": (0.98, 0.995),
"similar_incidents_range": (1, 3),
"similarity_score_range": (0.90, 0.97),
"pattern_confidence_range": (0.93, 0.98),
"success_rate_range": (0.75, 0.85),
"cost_savings_range": (8000, 15000),
"resolution_time_range": (20, 35),
"affected_users_range": (50000, 100000),
"tags": ["network", "partition", "connectivity", "failure"]
},
"Storage I/O Saturation": {
"detection_confidence_range": (0.93, 0.98),
"detection_time_range": (45, 70),
"accuracy_range": (0.94, 0.98),
"similar_incidents_range": (2, 5),
"similarity_score_range": (0.86, 0.92),
"pattern_confidence_range": (0.87, 0.93),
"success_rate_range": (0.79, 0.87),
"cost_savings_range": (5500, 8500),
"resolution_time_range": (18, 28),
"affected_users_range": (25000, 45000),
"tags": ["storage", "disk", "io", "saturation"]
}
}
def get_scenario_config(scenario_name: str) -> Dict[str, Any]:
"""Get configuration for a specific scenario with defaults"""
return SCENARIO_CONFIGS.get(scenario_name, {
"detection_confidence_range": (0.90, 0.98),
"detection_time_range": (30, 60),
"accuracy_range": (0.92, 0.98),
"similar_incidents_range": (1, 3),
"similarity_score_range": (0.85, 0.95),
"pattern_confidence_range": (0.85, 0.95),
"success_rate_range": (0.75, 0.90),
"cost_savings_range": (4000, 8000),
"resolution_time_range": (15, 30),
"affected_users_range": (20000, 50000),
"tags": ["unknown", "incident"]
})
def simulate_arf_analysis(scenario_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Simulate ARF analysis with scenario-specific metrics
Args:
scenario_data: Dictionary containing scenario information
Returns:
Dictionary with analysis results
"""
scenario_name = scenario_data.get("name", "Unknown Scenario")
config = get_scenario_config(scenario_name)
# Generate scenario-specific values
detection_confidence = random.uniform(*config["detection_confidence_range"])
detection_time = random.randint(*config["detection_time_range"])
accuracy = random.uniform(*config["accuracy_range"])
return {
"analysis_complete": True,
"anomaly_detected": True,
"severity": scenario_data.get("severity", "HIGH_VARIANCE"), # Changed from "HIGH" to "HIGH_VARIANCE"
"confidence": round(detection_confidence, 3), # Round to 3 decimals
"detection_time_ms": detection_time * 1000, # Convert to ms for display
"detection_time_seconds": detection_time,
"accuracy": round(accuracy, 3),
"component": scenario_data.get("component", "unknown"),
"scenario_specific": True,
"scenario_name": scenario_name,
"tags": config["tags"]
}
def run_rag_similarity_search(scenario_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Simulate RAG similarity search with scenario-specific results
Args:
scenario_data: Dictionary containing scenario information
Returns:
List of similar incidents
"""
scenario_name = scenario_data.get("name", "Unknown Scenario")
config = get_scenario_config(scenario_name)
similar_count = random.randint(*config["similar_incidents_range"])
similar_incidents = []
# Generate similar incidents based on scenario
base_time = int(time.time())
for i in range(similar_count):
similarity_score = random.uniform(*config["similarity_score_range"])
cost_savings = random.randint(*config["cost_savings_range"])
resolution_time = random.randint(*config["resolution_time_range"])
affected_users = random.randint(*config["affected_users_range"])
# Different resolutions based on scenario type
if "cache" in scenario_name.lower() or "redis" in scenario_name.lower():
resolution = random.choice(["scale_out", "warm_cache", "memory_increase", "add_replicas"])
elif "database" in scenario_name.lower():
resolution = random.choice(["restart", "connection_pool_resize", "index_optimization", "vacuum"])
elif "kubernetes" in scenario_name.lower():
resolution = random.choice(["restart_pod", "memory_limit_increase", "node_drain", "resource_quota"])
elif "api" in scenario_name.lower():
resolution = random.choice(["circuit_breaker", "rate_limit_increase", "caching", "load_balancer"])
elif "network" in scenario_name.lower():
resolution = random.choice(["route_update", "failover", "bandwidth_increase", "redundancy"])
elif "storage" in scenario_name.lower():
resolution = random.choice(["io_optimization", "disk_upgrade", "cache_addition", "load_distribution"])
else:
resolution = random.choice(["investigate", "scale", "restart", "optimize"])
similar_incidents.append({
"incident_id": f"inc_{base_time - random.randint(1, 90)}_00{i}",
"similarity_score": round(similarity_score, 3),
"success": random.random() > 0.15, # 85% success rate
"resolution": resolution,
"cost_savings": cost_savings,
"detection_time": f"{random.randint(30, 60)}s",
"resolution_time": f"{resolution_time}m",
"pattern": f"{scenario_name.lower().replace(' ', '_')}_v{random.randint(1, 3)}",
"affected_users": affected_users,
"component_match": scenario_data.get("component", "unknown"),
"rag_source": "production_memory_v3",
"timestamp": f"2024-{random.randint(1, 12):02d}-{random.randint(1, 28):02d}"
})
return similar_incidents
def calculate_pattern_confidence(scenario_data: Dict[str, Any], similar_incidents: List[Dict[str, Any]]) -> float:
"""
Calculate pattern confidence based on similar incidents
Args:
scenario_data: Dictionary containing scenario information
similar_incidents: List of similar incidents from RAG search
Returns:
Pattern confidence score (0-1)
"""
scenario_name = scenario_data.get("name", "Unknown Scenario")
config = get_scenario_config(scenario_name)
if not similar_incidents:
return random.uniform(*config["pattern_confidence_range"])
# Calculate average similarity and success rate
similarity_scores = [inc["similarity_score"] for inc in similar_incidents]
success_rates = [1.0 if inc["success"] else 0.0 for inc in similar_incidents]
avg_similarity = sum(similarity_scores) / len(similarity_scores)
avg_success = sum(success_rates) / len(success_rates)
# Weighted average: 60% similarity, 40% success rate
confidence = (avg_similarity * 0.6) + (avg_success * 0.4)
# Add some randomness but keep within scenario range
min_conf, max_conf = config["pattern_confidence_range"]
confidence = max(min_conf, min(max_conf, confidence))
return round(confidence, 3)
def calculate_internal_success_rate(similar_incidents: List[Dict[str, Any]]) -> float:
"""
Calculate success rate for internal logic only.
Not for UI display in Decision View.
Doctrinal: Percentages invite debate, narratives shut it down.
Keep this internal for logic, surface only in Outcome View.
"""
if not similar_incidents:
return 0.0
success_count = sum(1 for inc in similar_incidents if inc.get("success", False))
return round(success_count / len(similar_incidents), 3)
def check_contraindications(scenario_data: Dict[str, Any], similar_incidents: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Check for contraindications based on retry amplification signatures and historical evidence
Returns:
Dictionary with contraindication analysis
"""
component = scenario_data.get("component", "").lower()
scenario_name = scenario_data.get("name", "").lower()
# Detect retry amplification signatures
retry_amplification = False
evidence = []
# Check telemetry for retry storm indicators
telemetry = scenario_data.get("telemetry", {})
if telemetry.get("retry_storm", False):
retry_amplification = True
evidence.append("Telemetry shows retry_storm: True")
# Check for amplification factor in metrics
metrics = scenario_data.get("metrics", {})
amplification_factor = metrics.get("amplification_factor", 1.0)
if amplification_factor > 2.0:
retry_amplification = True
evidence.append(f"Amplification factor {amplification_factor} > 2.0")
# Check database load
db_load = metrics.get("database_load_percent", 0)
if db_load > 85:
retry_amplification = True
evidence.append(f"Database load {db_load}% > 85%")
# Check historical incidents for scaling-first failures
historical_scaling_failures = False
scaling_failure_evidence = []
for incident in similar_incidents:
resolution = incident.get("resolution", "").lower()
success = incident.get("success", True)
# Check for scaling-first resolutions that failed
if any(scale_term in resolution for scale_term in ["scale", "increase", "add_replicas"]):
if not success:
historical_scaling_failures = True
scaling_failure_evidence.append(
f"{incident.get('timestamp', 'Unknown date')}: {resolution} failed"
)
contraindicated_actions = []
if retry_amplification or historical_scaling_failures:
contraindicated_actions.append("scale_during_retry_amplification")
return {
"retry_amplification": retry_amplification,
"historical_scaling_failures": historical_scaling_failures,
"evidence": evidence + scaling_failure_evidence,
"contraindicated_actions": contraindicated_actions,
"confidence": 0.92 if evidence else 0.0
}
def create_mock_healing_intent(scenario_data: Dict[str, Any], similar_incidents: List[Dict[str, Any]], confidence: float) -> Dict[str, Any]:
"""
Create doctrinally compliant healing intent with sequencing thesis enforcement
Doctrinal Addition: Explicit Observation Gate when contraindications exist OR confidence < threshold
Psychological Goal: Make inaction an explicit, powerful decision
"""
# Check for contraindications FIRST (doctrinal constraint)
contraindications = check_contraindications(scenario_data, similar_incidents)
scenario_name = scenario_data.get("name", "Unknown Scenario")
config = get_scenario_config(scenario_name)
component = scenario_data.get("component", "unknown")
# ============ OBSERVATION GATE LOGIC ============
# Key psychological addition: Explicit deferral when uncertainty is high
observation_gate_threshold = 0.70 # Below this, we observe first
should_observe_first = (
contraindications["retry_amplification"] or
contraindications["historical_scaling_failures"] or
confidence < observation_gate_threshold or
len(similar_incidents) < 2 # Insufficient historical evidence
)
if should_observe_first:
# Return OBSERVATION GATE state - intentional inaction
current_time = datetime.datetime.now()
next_evaluation = current_time + datetime.timedelta(minutes=5)
return {
"action": "defer_decision_for_trend_confirmation",
"component": component,
"confidence": round(confidence, 3),
"parameters": {
"observation_window": "5m",
"metrics_to_watch": ["retry_count", "database_load_percent", "error_rate"],
"trend_threshold": "stabilizing_or_declining"
},
"source": "observation_gate_logic",
"requires_enterprise": False,
"advisory_only": True,
# CRITICAL PSYCHOLOGICAL FIELDS
"execution_state": "observe_only",
"next_evaluation_window": "5m",
"decision_frozen_until": next_evaluation.isoformat(),
"deferral_reason": "uncertainty_too_high_for_action" if confidence < observation_gate_threshold else
"contraindications_present" if contraindications["retry_amplification"] else
"historical_failures_detected" if contraindications["historical_scaling_failures"] else
"insufficient_historical_evidence",
# FORMAL HEALINGINTENT FIELDS
"preconditions": [
f"Confidence threshold not met ({confidence:.2f} < {observation_gate_threshold})" if confidence < observation_gate_threshold else
"Retry amplification detected" if contraindications["retry_amplification"] else
"Historical scaling failures present" if contraindications["historical_scaling_failures"] else
"Insufficient similar incidents for pattern matching"
],
"contraindicated_actions": ["any_healing_action_during_high_uncertainty"],
"reversibility_statement": "Evaluation resumes automatically after 5-minute observation window",
"sequencing_rule": "observe_before_any_action_when_uncertain",
"historical_evidence": [
f"{len(similar_incidents)} similar incidents analyzed (minimum 2 required)",
"Observation-first reduces incorrect actions by 67% (historical analysis)"
],
# SUCCESS RATE HANDLING (kept internal, not surfaced early)
"_internal_success_rate": calculate_internal_success_rate(similar_incidents) if similar_incidents else 0.0,
"_internal_notes": "Success rate kept internal; percentages invite debate, narratives shut it down",
"scenario_specific": True,
"scenario_name": scenario_name
}
# If retry amplification detected (but passed observation gate threshold), enforce dampening-first logic
if contraindications["retry_amplification"]:
return {
"action": "implement_request_coalescing_with_exponential_backoff",
"component": component,
"confidence": max(confidence, 0.85), # High confidence for dampening-first
"parameters": {
"coalescing_window_ms": "100-500ms",
"backoff_factor": "exponential",
"max_retries": 3,
"timeout": "10m"
},
"source": "contraindication_detection",
"requires_enterprise": False,
"advisory_only": False,
# CRITICAL: Add observation window even for dampening actions
"post_action_observation": {
"required": True,
"duration": "5m",
"metrics": ["retry_count", "database_load_percent", "latency_p99"]
},
"success_rate": 0.88,
"estimated_impact": {
"cost_savings": 4500,
"resolution_time_minutes": 12,
"users_protected": random.randint(*config["affected_users_range"]),
"mttr_reduction": "73%"
},
"safety_checks": {
"blast_radius": "single_service",
"business_hours": "compliant",
"rollback_plan": "coalescing_disable",
"approval_required": False,
"risk_level": "low"
},
# FORMAL HEALINGINTENT FIELDS (doctrinal constraint)
"preconditions": [
"Retry amplification signature detected",
f"Amplification factor > {scenario_data.get('metrics', {}).get('amplification_factor', 2.0)}",
"Database load > 85%"
],
"contraindicated_actions": ["scale_during_retry_storm", "add_capacity_during_amplification"],
"reversibility_statement": "Remove coalescing window after 10 minutes of stable operation",
"sequencing_rule": "dampening_first_then_observe_then_optional_scale",
"historical_evidence": contraindications["evidence"][:3], # Top 3 evidence items
"scenario_specific": True,
"scenario_name": scenario_name
}
# Only proceed with normal logic if no contraindications AND passed observation gate
# Determine action based on component and scenario WITH sequencing logic
ranked_actions = []
# DAMPENING actions (always first in sequence)
dampening_actions = []
if "api" in component.lower() or "rate" in scenario_name.lower():
dampening_actions.append({
"action": "circuit_breaker",
"confidence": confidence * 0.95, # Slightly lower confidence for dampening
"parameters": {
"threshold": f"{random.randint(70, 85)}%",
"window": f"{random.randint(3, 10)}m",
"fallback": "cached_response",
"retry_after": f"{random.randint(30, 120)}s"
}
})
# Add general dampening for retry-prone scenarios
if any(term in component.lower() for term in ["redis", "cache", "database"]):
dampening_actions.append({
"action": "request_batching_with_timeout",
"confidence": confidence * 0.92,
"parameters": {
"batch_size": "10-50 requests",
"timeout_ms": "100ms",
"strategy": "adaptive"
}
})
# Add dampening actions to ranked list
for i, act in enumerate(dampening_actions):
ranked_actions.append({
"rank": len(ranked_actions) + 1,
"action": act["action"],
"confidence": round(act["confidence"], 3),
"parameters": act["parameters"],
"category": "dampening"
})
# CONCURRENCY CAP actions (second in sequence)
if "database" in component.lower():
ranked_actions.append({
"rank": len(ranked_actions) + 1,
"action": "connection_pool_limit_adjustment",
"confidence": confidence * 0.88,
"parameters": {
"max_connections": f"{random.randint(100, 200)}",
"timeout": f"{random.randint(30, 60)}s"
},
"category": "concurrency_control"
})
# OBSERVE actions (third in sequence)
ranked_actions.append({
"rank": len(ranked_actions) + 1,
"action": "enhanced_monitoring_with_telemetry",
"confidence": confidence * 0.85,
"parameters": {
"duration": "5m",
"metrics": ["latency_p99", "error_rate", "throughput"],
"alert_threshold": "2x_baseline"
},
"category": "observation"
})
# SCALING actions (ONLY if no contraindications AND last in sequence)
# AND only if confidence justifies scaling over dampening
scaling_confidence_threshold = 0.75 # Scaling requires higher confidence
if confidence > scaling_confidence_threshold and not contraindications["historical_scaling_failures"]:
if "cache" in component.lower() or "redis" in component.lower():
scaling_action = {
"rank": len(ranked_actions) + 1,
"action": "gradual_scale_out",
"confidence": confidence * 0.80, # Lower confidence than dampening
"parameters": {
"nodes": f"{random.randint(2, 4)}{random.randint(4, 6)}",
"strategy": "one_by_one",
"health_check_interval": "30s"
},
"category": "scaling",
"constraints": ["Only if dampening insufficient after 5 minutes"]
}
ranked_actions.append(scaling_action)
# Calculate success rate internally only
_internal_success_rate = calculate_internal_success_rate(similar_incidents) if similar_incidents else random.uniform(*config["success_rate_range"])
# Calculate estimated impact
if similar_incidents:
avg_cost_savings = sum(inc["cost_savings"] for inc in similar_incidents) / len(similar_incidents)
avg_resolution_time = sum(int(inc["resolution_time"].replace('m', '')) for inc in similar_incidents) / len(similar_incidents)
else:
avg_cost_savings = sum(config["cost_savings_range"]) / 2
avg_resolution_time = sum(config["resolution_time_range"]) / 2
# Primary action is first in ranked_actions (dampening-first)
primary_action = ranked_actions[0] if ranked_actions else {
"action": "investigate",
"confidence": confidence,
"parameters": {"priority": "high"}
}
return {
"action": primary_action["action"],
"component": component,
"confidence": round(confidence, 3),
"parameters": primary_action.get("parameters", {}),
"source": "sequencing_analysis",
"requires_enterprise": True,
"advisory_only": True,
# SUCCESS RATE: Internal only, not for UI display in Decision View
"_internal_success_rate": _internal_success_rate,
"_internal_notes": "Success rate for internal logic; surface narrative outcomes, not percentages",
"estimated_impact": {
"cost_savings": int(avg_cost_savings),
"resolution_time_minutes": int(avg_resolution_time),
"users_protected": random.randint(*config["affected_users_range"]),
"mttr_reduction": f"{random.randint(60, 80)}%"
},
"safety_checks": {
"blast_radius": f"{random.randint(1, 3)} services",
"business_hours": "compliant",
"rollback_plan": "available",
"approval_required": True,
"risk_level": "medium" if confidence < 0.9 else "low"
},
# FORMAL HEALINGINTENT FIELDS (doctrinal constraint)
"preconditions": [
f"Component: {component}",
f"Confidence threshold > {scaling_confidence_threshold}",
"No retry amplification detected",
"Historical scaling success rate > 70%"
],
"contraindicated_actions": contraindications["contraindicated_actions"],
"reversibility_statement": f"Rollback to previous configuration available within {random.randint(5, 15)} minutes",
"sequencing_rule": "dampening_before_concurrency_before_observation_before_scaling",
"ranked_actions": ranked_actions,
"historical_evidence": [f"{len(similar_incidents)} similar incidents analyzed"],
"scenario_specific": True,
"scenario_name": scenario_name
}
def get_scenario_metrics(scenario_name: str) -> Dict[str, Any]:
"""
Get dynamic metrics for a specific scenario
Args:
scenario_name: Name of the scenario
Returns:
Dictionary with scenario-specific metrics
"""
config = get_scenario_config(scenario_name)
# Generate dynamic values within ranges
return {
"detection_confidence": round(random.uniform(*config["detection_confidence_range"]), 3),
"detection_time_seconds": random.randint(*config["detection_time_range"]),
"accuracy": round(random.uniform(*config["accuracy_range"]), 3),
"expected_similar_incidents": random.randint(*config["similar_incidents_range"]),
"avg_similarity_score": round(random.uniform(*config["similarity_score_range"]), 3),
"pattern_confidence": round(random.uniform(*config["pattern_confidence_range"]), 3),
"success_rate": round(random.uniform(*config["success_rate_range"]), 3),
"cost_savings_range": config["cost_savings_range"],
"resolution_time_range": config["resolution_time_range"],
"affected_users_range": config["affected_users_range"],
"tags": config["tags"]
}
def detect_retry_amplification(telemetry_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Detect retry amplification signatures from telemetry data
Doctrinal constraint: Must be REAL detection, not hardcoded in scenarios
Args:
telemetry_data: Dictionary containing telemetry metrics
Returns:
Dictionary with detection results
"""
# Extract metrics with defaults
retry_storm = telemetry_data.get("retry_storm", False)
retry_count = telemetry_data.get("retry_count", 0)
success_count = telemetry_data.get("success_count", 1) # Avoid division by zero
database_load = telemetry_data.get("database_load_percent", 0)
retry_cascade_depth = telemetry_data.get("retry_cascade_depth", 0)
# Calculate amplification factor
amplification_factor = 1.0
if success_count > 0:
amplification_factor = retry_count / success_count
# Detect signatures
detected = (
retry_storm or
amplification_factor > 2.0 or
retry_cascade_depth > 2 or
database_load > 85
)
signature = None
if detected:
if retry_storm and amplification_factor > 3.0:
signature = "exponential_retry_cascade"
elif database_load > 85 and amplification_factor > 1.5:
signature = "database_amplified_retry"
else:
signature = "retry_amplification_detected"
# Calculate confidence based on evidence strength
confidence_factors = []
if retry_storm:
confidence_factors.append(0.3)
if amplification_factor > 2.0:
confidence_factors.append(0.25 * min(amplification_factor / 5.0, 1.0))
if retry_cascade_depth > 2:
confidence_factors.append(0.2 * min(retry_cascade_depth / 5.0, 1.0))
if database_load > 85:
confidence_factors.append(0.25 * min(database_load / 100.0, 1.0))
confidence = min(0.98, 0.1 + sum(confidence_factors)) if confidence_factors else 0.0
return {
"detected": detected,
"amplification_factor": round(amplification_factor, 2),
"signature": signature,
"confidence": round(confidence, 3),
"metrics": {
"retry_storm": retry_storm,
"retry_count": retry_count,
"success_count": success_count,
"database_load_percent": database_load,
"retry_cascade_depth": retry_cascade_depth
},
"recommendation": "implement_dampening_first" if detected else "proceed_with_caution"
}