Spaces:
Sleeping
Sleeping
| """ | |
| Zone Analysis Module for DCRM Curves | |
| This module analyzes each segmented zone from DCRM graphs and evaluates | |
| the health characteristics based on industry standards for circuit breaker | |
| dynamic contact resistance measurements. | |
| Healthy DCRM Curve Characteristics: | |
| - Smooth resistance profile without excessive spikes | |
| - Gradual resistance drop during arcing contact engagement | |
| - Sharp drop to low, stable resistance (30-80 µΩ) during main contact engagement | |
| - Smooth resistance increase during opening operation | |
| - Minimal oscillations and no high peaks | |
| - Reproducible signature over time | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Dict, List, Tuple, Any | |
| class ZoneAnalyzer: | |
| """Analyzes individual zones of DCRM curves for health assessment.""" | |
| # Healthy curve thresholds (based on research) | |
| HEALTHY_THRESHOLDS = { | |
| 'main_contact_resistance_max': 80, # µΩ (micro-ohms) - converted to graph units | |
| 'main_contact_resistance_min': 30, # µΩ | |
| 'max_resistance_spike_ratio': 3.0, # Max spike should be < 3x baseline | |
| 'max_oscillation_percentage': 15, # Max 15% oscillation in stable zones | |
| 'smoothness_threshold': 0.85, # Correlation coefficient for smoothness | |
| 'current_rise_rate_min': 0.5, # Minimum rate of current rise in Zone 1 | |
| 'travel_stability_threshold': 5, # Max variation in travel during conduction | |
| } | |
| def __init__(self, df: pd.DataFrame, zones_data: Dict[str, Any]): | |
| """ | |
| Initialize the zone analyzer. | |
| Args: | |
| df: DataFrame with columns ['Time (ms)', 'Current', 'Resistance', 'Travel'] | |
| zones_data: Dictionary containing zone segmentation information | |
| """ | |
| self.df = df | |
| self.zones_data = zones_data | |
| self.analysis_results = {} | |
| def analyze_all_zones(self) -> Dict[str, Any]: | |
| """ | |
| Analyze all zones and return comprehensive health assessment. | |
| Returns: | |
| Dictionary containing analysis results for each zone | |
| """ | |
| if 'zones' not in self.zones_data: | |
| return {'error': 'No zone data available'} | |
| zones = self.zones_data['zones'] | |
| # Analyze each zone | |
| for zone_name, zone_info in zones.items(): | |
| zone_df = self._extract_zone_data(zone_info) | |
| if zone_df is not None and len(zone_df) > 0: | |
| analysis = self._analyze_zone(zone_name, zone_df, zone_info) | |
| self.analysis_results[zone_name] = analysis | |
| # Generate overall health assessment | |
| overall_health = self._calculate_overall_health() | |
| self.analysis_results['overall_health'] = overall_health | |
| return self.analysis_results | |
| def _extract_zone_data(self, zone_info: Dict) -> pd.DataFrame: | |
| """Extract data for a specific zone based on time boundaries.""" | |
| start_ms = zone_info.get('start_ms', 0) | |
| end_ms = zone_info.get('end_ms', 0) | |
| mask = (self.df['Time (ms)'] >= start_ms) & (self.df['Time (ms)'] <= end_ms) | |
| return self.df[mask].copy() | |
| def _analyze_zone(self, zone_name: str, zone_df: pd.DataFrame, | |
| zone_info: Dict) -> Dict[str, Any]: | |
| """ | |
| Analyze a specific zone based on its characteristics. | |
| Args: | |
| zone_name: Name of the zone | |
| zone_df: DataFrame containing zone data | |
| zone_info: Zone metadata | |
| Returns: | |
| Dictionary with zone analysis results | |
| """ | |
| analysis = { | |
| 'zone_name': zone_name, | |
| 'duration_ms': zone_info.get('end_ms', 0) - zone_info.get('start_ms', 0), | |
| 'health_status': 'Unknown', | |
| 'health_score': 0.0, | |
| 'issues': [], | |
| 'metrics': {} | |
| } | |
| # Zone-specific analysis | |
| if 'zone_1' in zone_name: | |
| analysis.update(self._analyze_zone_1_pre_contact(zone_df)) | |
| elif 'zone_2' in zone_name: | |
| analysis.update(self._analyze_zone_2_arcing_engagement(zone_df)) | |
| elif 'zone_3' in zone_name: | |
| analysis.update(self._analyze_zone_3_main_conduction(zone_df)) | |
| elif 'zone_4' in zone_name: | |
| analysis.update(self._analyze_zone_4_parting(zone_df)) | |
| elif 'zone_5' in zone_name: | |
| analysis.update(self._analyze_zone_5_final_open(zone_df)) | |
| return analysis | |
| def _analyze_zone_1_pre_contact(self, zone_df: pd.DataFrame) -> Dict[str, Any]: | |
| """ | |
| Analyze Zone 1: Pre-Contact Travel | |
| Expected behavior: | |
| - Travel should be increasing (contacts moving) | |
| - Current should be near zero (no contact yet) | |
| - Resistance should be very high (infinite/open circuit) | |
| """ | |
| metrics = {} | |
| issues = [] | |
| # Check travel progression | |
| travel_values = zone_df['Travel'].dropna() | |
| if len(travel_values) > 1: | |
| travel_trend = np.polyfit(range(len(travel_values)), travel_values, 1)[0] | |
| metrics['travel_rate'] = float(travel_trend) | |
| if travel_trend < 0.1: | |
| issues.append('Travel not increasing properly - possible mechanical issue') | |
| # Check current is near baseline | |
| current_values = zone_df['Current'].dropna() | |
| if len(current_values) > 0: | |
| current_mean = current_values.mean() | |
| current_std = current_values.std() | |
| metrics['current_baseline'] = float(current_mean) | |
| metrics['current_stability'] = float(current_std) | |
| # Current should rise towards end of zone | |
| if len(current_values) > 5: | |
| early_current = current_values.iloc[:len(current_values)//3].mean() | |
| late_current = current_values.iloc[-len(current_values)//3:].mean() | |
| current_rise = late_current - early_current | |
| metrics['current_rise'] = float(current_rise) | |
| if current_rise < self.HEALTHY_THRESHOLDS['current_rise_rate_min']: | |
| issues.append('Insufficient current rise - delayed contact engagement') | |
| # Calculate health score | |
| health_score = self._calculate_zone_health_score(metrics, issues, zone_type='zone_1') | |
| return { | |
| 'metrics': metrics, | |
| 'issues': issues, | |
| 'health_score': health_score, | |
| 'health_status': self._get_health_status(health_score) | |
| } | |
| def _analyze_zone_2_arcing_engagement(self, zone_df: pd.DataFrame) -> Dict[str, Any]: | |
| """ | |
| Analyze Zone 2: Arcing Contact Engagement | |
| Expected behavior: | |
| - Resistance drops from high to moderate (arcing contacts engaging) | |
| - Should see resistance spikes (arcing activity) | |
| - Current starts flowing | |
| - Smooth gradual drop is healthy | |
| """ | |
| metrics = {} | |
| issues = [] | |
| resistance_values = zone_df['Resistance'].dropna() | |
| if len(resistance_values) > 2: | |
| # Check for gradual resistance drop | |
| res_start = resistance_values.iloc[:3].mean() | |
| res_end = resistance_values.iloc[-3:].mean() | |
| res_drop = res_start - res_end | |
| metrics['resistance_drop'] = float(res_drop) | |
| if res_drop < 0: | |
| issues.append('Resistance increasing instead of dropping - abnormal arcing') | |
| # Analyze resistance spikes (expected during arcing) | |
| res_peaks = self._detect_peaks(resistance_values) | |
| metrics['spike_count'] = len(res_peaks) | |
| if len(res_peaks) > 0: | |
| max_spike = resistance_values.iloc[res_peaks].max() | |
| baseline = resistance_values.median() | |
| spike_ratio = max_spike / baseline if baseline > 0 else 0 | |
| metrics['max_spike_ratio'] = float(spike_ratio) | |
| if spike_ratio > self.HEALTHY_THRESHOLDS['max_resistance_spike_ratio']: | |
| issues.append(f'Excessive resistance spikes ({spike_ratio:.1f}x) - possible contact damage') | |
| # Check smoothness of transition | |
| smoothness = self._calculate_smoothness(resistance_values) | |
| metrics['transition_smoothness'] = float(smoothness) | |
| if smoothness < 0.6: # Lower threshold for arcing zone (spikes expected) | |
| issues.append('Erratic resistance pattern - possible contact erosion') | |
| # Check current flow | |
| current_values = zone_df['Current'].dropna() | |
| if len(current_values) > 0: | |
| metrics['current_mean'] = float(current_values.mean()) | |
| metrics['current_max'] = float(current_values.max()) | |
| health_score = self._calculate_zone_health_score(metrics, issues, zone_type='zone_2') | |
| return { | |
| 'metrics': metrics, | |
| 'issues': issues, | |
| 'health_score': health_score, | |
| 'health_status': self._get_health_status(health_score) | |
| } | |
| def _analyze_zone_3_main_conduction(self, zone_df: pd.DataFrame) -> Dict[str, Any]: | |
| """ | |
| Analyze Zone 3: Main Contact Conduction | |
| Expected behavior: | |
| - Resistance should be LOW and STABLE (30-80 µΩ ideal) | |
| - Travel should be at maximum (plateau) | |
| - Current should be stable | |
| - This is the "healthy contact" signature zone | |
| """ | |
| metrics = {} | |
| issues = [] | |
| resistance_values = zone_df['Resistance'].dropna() | |
| if len(resistance_values) > 0: | |
| res_mean = resistance_values.mean() | |
| res_std = resistance_values.std() | |
| res_min = resistance_values.min() | |
| res_max = resistance_values.max() | |
| metrics['resistance_mean'] = float(res_mean) | |
| metrics['resistance_std'] = float(res_std) | |
| metrics['resistance_range'] = float(res_max - res_min) | |
| # Check if resistance is in healthy range | |
| # Note: Graph units may not be µΩ, so we check relative stability instead | |
| oscillation_pct = (res_std / res_mean * 100) if res_mean > 0 else 0 | |
| metrics['oscillation_percentage'] = float(oscillation_pct) | |
| if oscillation_pct > self.HEALTHY_THRESHOLDS['max_oscillation_percentage']: | |
| issues.append(f'Excessive resistance oscillation ({oscillation_pct:.1f}%) - poor contact quality') | |
| # Check for stability (should be flat) | |
| smoothness = self._calculate_smoothness(resistance_values) | |
| metrics['resistance_stability'] = float(smoothness) | |
| if smoothness < self.HEALTHY_THRESHOLDS['smoothness_threshold']: | |
| issues.append('Unstable resistance - possible contact bouncing or misalignment') | |
| # Check travel plateau | |
| travel_values = zone_df['Travel'].dropna() | |
| if len(travel_values) > 0: | |
| travel_variation = travel_values.std() | |
| metrics['travel_variation'] = float(travel_variation) | |
| if travel_variation > self.HEALTHY_THRESHOLDS['travel_stability_threshold']: | |
| issues.append('Travel not stable - mechanical issue during conduction') | |
| # Check current stability | |
| current_values = zone_df['Current'].dropna() | |
| if len(current_values) > 0: | |
| current_std = current_values.std() | |
| current_mean = current_values.mean() | |
| current_stability = (current_std / current_mean * 100) if current_mean > 0 else 0 | |
| metrics['current_stability_pct'] = float(current_stability) | |
| health_score = self._calculate_zone_health_score(metrics, issues, zone_type='zone_3') | |
| return { | |
| 'metrics': metrics, | |
| 'issues': issues, | |
| 'health_score': health_score, | |
| 'health_status': self._get_health_status(health_score) | |
| } | |
| def _analyze_zone_4_parting(self, zone_df: pd.DataFrame) -> Dict[str, Any]: | |
| """ | |
| Analyze Zone 4: Main Contact Parting (The Break) | |
| Expected behavior: | |
| - Resistance should INCREASE sharply (contacts separating) | |
| - May see resistance spikes (arcing during separation) | |
| - Travel should start decreasing (opening) | |
| - Smooth increase is healthy | |
| """ | |
| metrics = {} | |
| issues = [] | |
| resistance_values = zone_df['Resistance'].dropna() | |
| if len(resistance_values) > 2: | |
| # Check for resistance increase | |
| res_start = resistance_values.iloc[:3].mean() | |
| res_end = resistance_values.iloc[-3:].mean() | |
| res_increase = res_end - res_start | |
| metrics['resistance_increase'] = float(res_increase) | |
| if res_increase < 0: | |
| issues.append('Resistance decreasing during parting - abnormal behavior') | |
| # Check rate of increase | |
| if len(resistance_values) > 1: | |
| res_trend = np.polyfit(range(len(resistance_values)), resistance_values, 1)[0] | |
| metrics['resistance_rise_rate'] = float(res_trend) | |
| if res_trend < 0.1: | |
| issues.append('Slow resistance rise - possible contact sticking') | |
| # Analyze spikes during parting (some arcing is normal) | |
| res_peaks = self._detect_peaks(resistance_values) | |
| metrics['parting_spike_count'] = len(res_peaks) | |
| if len(res_peaks) > 0: | |
| max_spike = resistance_values.iloc[res_peaks].max() | |
| baseline = resistance_values.median() | |
| spike_ratio = max_spike / baseline if baseline > 0 else 0 | |
| metrics['max_parting_spike_ratio'] = float(spike_ratio) | |
| if spike_ratio > self.HEALTHY_THRESHOLDS['max_resistance_spike_ratio'] * 1.5: | |
| issues.append(f'Excessive parting spikes ({spike_ratio:.1f}x) - severe arcing or contact damage') | |
| # Check travel movement | |
| travel_values = zone_df['Travel'].dropna() | |
| if len(travel_values) > 1: | |
| travel_trend = np.polyfit(range(len(travel_values)), travel_values, 1)[0] | |
| metrics['travel_opening_rate'] = float(travel_trend) | |
| if travel_trend > -0.1: # Should be negative (decreasing) | |
| issues.append('Travel not decreasing properly - mechanical opening issue') | |
| health_score = self._calculate_zone_health_score(metrics, issues, zone_type='zone_4') | |
| return { | |
| 'metrics': metrics, | |
| 'issues': issues, | |
| 'health_score': health_score, | |
| 'health_status': self._get_health_status(health_score) | |
| } | |
| def _analyze_zone_5_final_open(self, zone_df: pd.DataFrame) -> Dict[str, Any]: | |
| """ | |
| Analyze Zone 5: Final Open State | |
| Expected behavior: | |
| - Resistance should be very high and stable (infinite/open circuit) | |
| - Travel should be stable at minimum (fully open) | |
| - Current should be zero | |
| """ | |
| metrics = {} | |
| issues = [] | |
| resistance_values = zone_df['Resistance'].dropna() | |
| if len(resistance_values) > 0: | |
| res_mean = resistance_values.mean() | |
| res_std = resistance_values.std() | |
| metrics['final_resistance_mean'] = float(res_mean) | |
| metrics['final_resistance_stability'] = float(res_std) | |
| # Should be stable (flat line at high value) | |
| stability_pct = (res_std / res_mean * 100) if res_mean > 0 else 0 | |
| metrics['stability_percentage'] = float(stability_pct) | |
| if stability_pct > 10: | |
| issues.append('Unstable final resistance - possible incomplete opening') | |
| # Check travel is stable | |
| travel_values = zone_df['Travel'].dropna() | |
| if len(travel_values) > 0: | |
| travel_std = travel_values.std() | |
| metrics['travel_final_stability'] = float(travel_std) | |
| if travel_std > 3: | |
| issues.append('Travel unstable in final state - mechanical issue') | |
| # Check current is near zero | |
| current_values = zone_df['Current'].dropna() | |
| if len(current_values) > 0: | |
| current_mean = current_values.mean() | |
| metrics['final_current'] = float(current_mean) | |
| # Current should be very low in open state | |
| initial_current = self.df['Current'].iloc[:10].mean() # Baseline from start | |
| if current_mean > initial_current * 1.5: | |
| issues.append('Elevated current in open state - possible leakage') | |
| health_score = self._calculate_zone_health_score(metrics, issues, zone_type='zone_5') | |
| return { | |
| 'metrics': metrics, | |
| 'issues': issues, | |
| 'health_score': health_score, | |
| 'health_status': self._get_health_status(health_score) | |
| } | |
| def _detect_peaks(self, signal: pd.Series, prominence_factor: float = 0.3) -> List[int]: | |
| """ | |
| Detect peaks in a signal. | |
| Args: | |
| signal: Input signal | |
| prominence_factor: Minimum prominence as fraction of signal range | |
| Returns: | |
| List of peak indices | |
| """ | |
| if len(signal) < 3: | |
| return [] | |
| values = signal.values | |
| signal_range = values.max() - values.min() | |
| min_prominence = signal_range * prominence_factor | |
| peaks = [] | |
| for i in range(1, len(values) - 1): | |
| if values[i] > values[i-1] and values[i] > values[i+1]: | |
| # Check prominence | |
| left_min = min(values[max(0, i-5):i]) | |
| right_min = min(values[i+1:min(len(values), i+6)]) | |
| prominence = values[i] - max(left_min, right_min) | |
| if prominence >= min_prominence: | |
| peaks.append(i) | |
| return peaks | |
| def _calculate_smoothness(self, signal: pd.Series) -> float: | |
| """ | |
| Calculate smoothness of a signal using correlation with fitted line. | |
| Args: | |
| signal: Input signal | |
| Returns: | |
| Smoothness score (0-1, higher is smoother) | |
| """ | |
| if len(signal) < 3: | |
| return 0.0 | |
| x = np.arange(len(signal)) | |
| y = signal.values | |
| # Fit a polynomial (degree 2 for curves, degree 1 for lines) | |
| try: | |
| coeffs = np.polyfit(x, y, deg=2) | |
| fitted = np.polyval(coeffs, x) | |
| # Calculate correlation | |
| correlation = np.corrcoef(y, fitted)[0, 1] | |
| return abs(correlation) if not np.isnan(correlation) else 0.0 | |
| except: | |
| return 0.0 | |
| def _calculate_zone_health_score(self, metrics: Dict, issues: List[str], | |
| zone_type: str) -> float: | |
| """ | |
| Calculate health score for a zone (0-100). | |
| Args: | |
| metrics: Zone metrics | |
| issues: List of detected issues | |
| zone_type: Type of zone | |
| Returns: | |
| Health score (0-100) | |
| """ | |
| # Start with perfect score | |
| score = 100.0 | |
| # Deduct points for each issue | |
| score -= len(issues) * 15 | |
| # Zone-specific scoring adjustments | |
| if zone_type == 'zone_3': # Main conduction - most critical | |
| if 'oscillation_percentage' in metrics: | |
| osc = metrics['oscillation_percentage'] | |
| if osc > 20: | |
| score -= 20 | |
| elif osc > 15: | |
| score -= 10 | |
| if 'resistance_stability' in metrics: | |
| if metrics['resistance_stability'] < 0.85: | |
| score -= 15 | |
| elif zone_type == 'zone_2' or zone_type == 'zone_4': # Arcing zones | |
| if 'max_spike_ratio' in metrics or 'max_parting_spike_ratio' in metrics: | |
| spike_key = 'max_spike_ratio' if 'max_spike_ratio' in metrics else 'max_parting_spike_ratio' | |
| spike_ratio = metrics[spike_key] | |
| if spike_ratio > 5: | |
| score -= 25 | |
| elif spike_ratio > 3: | |
| score -= 10 | |
| # Ensure score is in valid range | |
| return max(0.0, min(100.0, score)) | |
| def _get_health_status(self, score: float) -> str: | |
| """Convert health score to status label.""" | |
| if score >= 85: | |
| return 'Excellent' | |
| elif score >= 70: | |
| return 'Good' | |
| elif score >= 50: | |
| return 'Fair' | |
| elif score >= 30: | |
| return 'Poor' | |
| else: | |
| return 'Critical' | |
| def _calculate_overall_health(self) -> Dict[str, Any]: | |
| """ | |
| Calculate overall health assessment across all zones. | |
| Returns: | |
| Dictionary with overall health metrics | |
| """ | |
| if not self.analysis_results: | |
| return {'status': 'No data', 'score': 0.0} | |
| # Collect all zone scores | |
| zone_scores = [] | |
| all_issues = [] | |
| for zone_name, analysis in self.analysis_results.items(): | |
| if isinstance(analysis, dict) and 'health_score' in analysis: | |
| zone_scores.append(analysis['health_score']) | |
| all_issues.extend(analysis.get('issues', [])) | |
| if not zone_scores: | |
| return {'status': 'Unknown', 'score': 0.0} | |
| # Calculate weighted average (Zone 3 is most important) | |
| weights = { | |
| 'zone_1_pre_contact': 0.15, | |
| 'zone_2_arcing_engagement': 0.20, | |
| 'zone_3_main_conduction': 0.35, # Most critical | |
| 'zone_4_parting': 0.20, | |
| 'zone_5_final_open': 0.10 | |
| } | |
| weighted_score = 0.0 | |
| total_weight = 0.0 | |
| for zone_name, analysis in self.analysis_results.items(): | |
| if isinstance(analysis, dict) and 'health_score' in analysis: | |
| weight = weights.get(zone_name, 0.2) | |
| weighted_score += analysis['health_score'] * weight | |
| total_weight += weight | |
| overall_score = weighted_score / total_weight if total_weight > 0 else 0.0 | |
| return { | |
| 'overall_score': round(overall_score, 2), | |
| 'status': self._get_health_status(overall_score), | |
| 'total_issues': len(all_issues), | |
| 'critical_issues': [issue for issue in all_issues if 'severe' in issue.lower() or 'critical' in issue.lower()], | |
| 'recommendation': self._generate_recommendation(overall_score, all_issues) | |
| } | |
| def _generate_recommendation(self, score: float, issues: List[str]) -> str: | |
| """Generate maintenance recommendation based on analysis.""" | |
| if score >= 85: | |
| return 'Circuit breaker is in excellent condition. Continue regular monitoring.' | |
| elif score >= 70: | |
| return 'Circuit breaker is in good condition. Schedule routine maintenance as planned.' | |
| elif score >= 50: | |
| return 'Circuit breaker shows signs of wear. Increase monitoring frequency and plan maintenance.' | |
| elif score >= 30: | |
| return 'Circuit breaker condition is poor. Schedule maintenance soon to prevent failure.' | |
| else: | |
| return 'CRITICAL: Circuit breaker requires immediate attention. Risk of failure is high.' | |
| def analyze_zones_with_image(df: pd.DataFrame, zones_data: Dict[str, Any], | |
| annotated_image: np.ndarray = None) -> Dict[str, Any]: | |
| """ | |
| Convenience function to analyze zones and optionally annotate image. | |
| Args: | |
| df: DataFrame with DCRM data | |
| zones_data: Zone segmentation data | |
| annotated_image: Optional image to annotate with analysis results | |
| Returns: | |
| Complete analysis results | |
| """ | |
| analyzer = ZoneAnalyzer(df, zones_data) | |
| results = analyzer.analyze_all_zones() | |
| # If image provided, add visual annotations | |
| if annotated_image is not None: | |
| results['annotated_image'] = _annotate_image_with_health( | |
| annotated_image, results, zones_data | |
| ) | |
| return results | |
| def _annotate_image_with_health(image: np.ndarray, analysis_results: Dict[str, Any], | |
| zones_data: Dict[str, Any]) -> np.ndarray: | |
| """ | |
| Annotate image with health status for each zone. | |
| Args: | |
| image: Input image | |
| analysis_results: Analysis results from ZoneAnalyzer | |
| zones_data: Zone segmentation data | |
| Returns: | |
| Annotated image | |
| """ | |
| import cv2 | |
| annotated = image.copy() | |
| height = annotated.shape[0] | |
| # Color coding for health status | |
| status_colors = { | |
| 'Excellent': (0, 255, 0), # Green | |
| 'Good': (144, 238, 144), # Light Green | |
| 'Fair': (255, 255, 0), # Yellow | |
| 'Poor': (255, 165, 0), # Orange | |
| 'Critical': (255, 0, 0) # Red | |
| } | |
| if 'zones' in zones_data: | |
| for zone_name, zone_info in zones_data['zones'].items(): | |
| if zone_name in analysis_results: | |
| analysis = analysis_results[zone_name] | |
| status = analysis.get('health_status', 'Unknown') | |
| color = status_colors.get(status, (128, 128, 128)) | |
| # Add colored indicator at top of zone | |
| # This is a simple implementation - can be enhanced | |
| y_pos = 30 | |
| text = f"{status} ({analysis.get('health_score', 0):.0f})" | |
| cv2.putText(annotated, text, (10, y_pos), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) | |
| return annotated | |