| """ |
| Root cause analysis for robotic system failures. |
| Works with IoTEvent objects that contain temperature, vibration, motor_current, position_error. |
| """ |
|
|
| import logging |
| from typing import Dict, Any, Optional |
| from agentic_reliability_framework.runtime.agents.base import BaseAgent, AgentSpecialization |
| from iot_event import IoTEvent |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class RoboticsDiagnostician(BaseAgent): |
| """ |
| Diagnoses robotic system failures by comparing sensor readings against thresholds. |
| Intended to be used with IoTEvent instances. |
| """ |
|
|
| def __init__(self, thresholds: Optional[Dict[str, float]] = None): |
| """ |
| Args: |
| thresholds: Optional dict overriding default sensor thresholds. |
| Keys: temperature, vibration, motor_current, position_error. |
| """ |
| super().__init__(AgentSpecialization.DIAGNOSTICIAN) |
| self.thresholds = thresholds or { |
| 'temperature': 40.0, |
| 'vibration': 0.3, |
| 'motor_current': 3.5, |
| 'position_error': 0.05 |
| } |
|
|
| async def analyze(self, event: IoTEvent) -> Dict[str, Any]: |
| """ |
| Analyze sensor readings and return diagnosis. |
| |
| Args: |
| event: An IoTEvent containing sensor fields. |
| |
| Returns: |
| Dictionary with keys: |
| - specialization: 'robotics' |
| - confidence: float (0‑1) |
| - findings: dict with 'exceeded_thresholds' and 'likely_root_causes' |
| - recommendations: list of strings |
| """ |
| try: |
| |
| temp = getattr(event, 'temperature', None) |
| vib = getattr(event, 'vibration', None) |
| curr = getattr(event, 'motor_current', None) |
| pos_err = getattr(event, 'position_error', None) |
|
|
| if temp is None: |
| |
| return { |
| 'specialization': 'robotics', |
| 'confidence': 0.0, |
| 'findings': {}, |
| 'recommendations': [] |
| } |
|
|
| |
| flags = [] |
| if temp > self.thresholds['temperature']: |
| flags.append('overheat') |
| if vib > self.thresholds['vibration']: |
| flags.append('excess_vibration') |
| if curr > self.thresholds['motor_current']: |
| flags.append('overcurrent') |
| if pos_err > self.thresholds['position_error']: |
| flags.append('position_drift') |
|
|
| |
| causes = [] |
| if 'overheat' in flags and 'overcurrent' in flags: |
| causes.append('motor_stall') |
| elif 'overheat' in flags: |
| causes.append('cooling_failure') |
| elif 'excess_vibration' in flags: |
| causes.append('bearing_wear') |
| elif 'position_drift' in flags: |
| causes.append('encoder_misalignment') |
| else: |
| causes.append('normal_operation') |
|
|
| return { |
| 'specialization': 'robotics', |
| 'confidence': 0.8 if flags else 0.0, |
| 'findings': { |
| 'exceeded_thresholds': flags, |
| 'likely_root_causes': causes, |
| }, |
| 'recommendations': [ |
| f"Check {cause}" for cause in causes |
| ] if flags else [] |
| } |
| except Exception as e: |
| logger.error(f"RoboticsDiagnostician error: {e}", exc_info=True) |
| return { |
| 'specialization': 'robotics', |
| 'confidence': 0.0, |
| 'findings': {}, |
| 'recommendations': [] |
| } |