Agentic-Reliability-Framework-v4 / robotics_diagnostician.py
petter2025's picture
Update robotics_diagnostician.py
4a6c22e verified
raw
history blame
4.01 kB
"""
Root cause analysis for robotic system failures.
Works with IoTEvent objects that contain temperature, vibration, motor_current, position_error.
"""
import logging
from typing import Dict, Any, Optional
from agentic_reliability_framework.runtime.agents.base import BaseAgent, AgentSpecialization
from iot_event import IoTEvent # Use the specific event type (though base ReliabilityEvent also works)
logger = logging.getLogger(__name__)
class RoboticsDiagnostician(BaseAgent):
"""
Diagnoses robotic system failures by comparing sensor readings against thresholds.
Intended to be used with IoTEvent instances.
"""
def __init__(self, thresholds: Optional[Dict[str, float]] = None):
"""
Args:
thresholds: Optional dict overriding default sensor thresholds.
Keys: temperature, vibration, motor_current, position_error.
"""
super().__init__(AgentSpecialization.DIAGNOSTICIAN)
self.thresholds = thresholds or {
'temperature': 40.0,
'vibration': 0.3,
'motor_current': 3.5,
'position_error': 0.05
}
async def analyze(self, event: IoTEvent) -> Dict[str, Any]:
"""
Analyze sensor readings and return diagnosis.
Args:
event: An IoTEvent containing sensor fields.
Returns:
Dictionary with keys:
- specialization: 'robotics'
- confidence: float (0‑1)
- findings: dict with 'exceeded_thresholds' and 'likely_root_causes'
- recommendations: list of strings
"""
try:
# Extract metrics (safe even if event is base ReliabilityEvent without these fields)
temp = getattr(event, 'temperature', None)
vib = getattr(event, 'vibration', None)
curr = getattr(event, 'motor_current', None)
pos_err = getattr(event, 'position_error', None)
if temp is None:
# Event does not contain IoT fields – cannot diagnose
return {
'specialization': 'robotics',
'confidence': 0.0,
'findings': {},
'recommendations': []
}
# Detect which thresholds are exceeded
flags = []
if temp > self.thresholds['temperature']:
flags.append('overheat')
if vib > self.thresholds['vibration']:
flags.append('excess_vibration')
if curr > self.thresholds['motor_current']:
flags.append('overcurrent')
if pos_err > self.thresholds['position_error']:
flags.append('position_drift')
# Determine root cause based on combination
causes = []
if 'overheat' in flags and 'overcurrent' in flags:
causes.append('motor_stall')
elif 'overheat' in flags:
causes.append('cooling_failure')
elif 'excess_vibration' in flags:
causes.append('bearing_wear')
elif 'position_drift' in flags:
causes.append('encoder_misalignment')
else:
causes.append('normal_operation')
return {
'specialization': 'robotics',
'confidence': 0.8 if flags else 0.0,
'findings': {
'exceeded_thresholds': flags,
'likely_root_causes': causes,
},
'recommendations': [
f"Check {cause}" for cause in causes
] if flags else []
}
except Exception as e:
logger.error(f"RoboticsDiagnostician error: {e}", exc_info=True)
return {
'specialization': 'robotics',
'confidence': 0.0,
'findings': {},
'recommendations': []
}