Spaces:
Sleeping
Sleeping
| """ | |
| Image-Based Zone Analysis Module for DCRM Curves | |
| This module analyzes zones directly from the annotated image with segmentation lines, | |
| providing visual analysis of each zone based on the actual image content. | |
| """ | |
| import cv2 | |
| import numpy as np | |
| from typing import Dict, List, Tuple, Any | |
| import pandas as pd | |
| class ImageZoneAnalyzer: | |
| """Analyzes zones directly from the segmented image.""" | |
| def __init__(self, image: np.ndarray, zones_data: Dict[str, Any], | |
| bounds: Tuple[int, int], total_duration: float): | |
| """ | |
| Initialize the image-based zone analyzer. | |
| Args: | |
| image: Original image (BGR format) | |
| zones_data: Dictionary containing zone segmentation information | |
| bounds: (start_x, end_x) boundaries of the graph | |
| total_duration: Total duration in milliseconds | |
| """ | |
| self.image = image | |
| self.zones_data = zones_data | |
| self.bounds = bounds | |
| self.total_duration = total_duration | |
| self.analysis_results = {} | |
| # Extract graph region | |
| sx, ex = bounds | |
| self.graph_width = ex - sx | |
| self.graph_image = image[:, sx:ex] | |
| def analyze_all_zones(self) -> Dict[str, Any]: | |
| """ | |
| Analyze all zones based on image content. | |
| Returns: | |
| Dictionary containing analysis results for each zone | |
| """ | |
| if 'zones' not in self.zones_data: | |
| return {'error': 'No zone data available'} | |
| zones = self.zones_data['zones'] | |
| # Analyze each zone | |
| for zone_name, zone_info in zones.items(): | |
| zone_image = self._extract_zone_image(zone_info) | |
| if zone_image is not None and zone_image.shape[1] > 0: | |
| analysis = self._analyze_zone_image(zone_name, zone_image, zone_info) | |
| self.analysis_results[zone_name] = analysis | |
| # Generate overall health assessment | |
| overall_health = self._calculate_overall_health() | |
| self.analysis_results['overall_health'] = overall_health | |
| return self.analysis_results | |
| def _extract_zone_image(self, zone_info: Dict) -> np.ndarray: | |
| """Extract image region for a specific zone.""" | |
| start_ms = zone_info.get('start_ms', 0) | |
| end_ms = zone_info.get('end_ms', 0) | |
| # Convert time to pixel coordinates | |
| start_x = int((start_ms / self.total_duration) * self.graph_width) | |
| end_x = int((end_ms / self.total_duration) * self.graph_width) | |
| # Ensure valid bounds | |
| start_x = max(0, min(start_x, self.graph_width - 1)) | |
| end_x = max(start_x + 1, min(end_x, self.graph_width)) | |
| return self.graph_image[:, start_x:end_x] | |
| def _analyze_zone_image(self, zone_name: str, zone_image: np.ndarray, | |
| zone_info: Dict) -> Dict[str, Any]: | |
| """ | |
| Analyze a zone based on its image content. | |
| Args: | |
| zone_name: Name of the zone | |
| zone_image: Image region for this zone | |
| zone_info: Zone metadata | |
| Returns: | |
| Dictionary with zone analysis results | |
| """ | |
| analysis = { | |
| 'zone_name': zone_name, | |
| 'duration_ms': zone_info.get('end_ms', 0) - zone_info.get('start_ms', 0), | |
| 'health_status': 'Unknown', | |
| 'health_score': 0.0, | |
| 'issues': [], | |
| 'metrics': {} | |
| } | |
| # Extract color channels for each curve | |
| red_mask = self._extract_color_mask(zone_image, 'red') | |
| green_mask = self._extract_color_mask(zone_image, 'green') | |
| blue_mask = self._extract_color_mask(zone_image, 'blue') | |
| # Analyze based on zone type | |
| if 'zone_1' in zone_name: | |
| analysis.update(self._analyze_zone_1_image(zone_image, red_mask, green_mask, blue_mask)) | |
| elif 'zone_2' in zone_name: | |
| analysis.update(self._analyze_zone_2_image(zone_image, red_mask, green_mask, blue_mask)) | |
| elif 'zone_3' in zone_name: | |
| analysis.update(self._analyze_zone_3_image(zone_image, red_mask, green_mask, blue_mask)) | |
| elif 'zone_4' in zone_name: | |
| analysis.update(self._analyze_zone_4_image(zone_image, red_mask, green_mask, blue_mask)) | |
| elif 'zone_5' in zone_name: | |
| analysis.update(self._analyze_zone_5_image(zone_image, red_mask, green_mask, blue_mask)) | |
| return analysis | |
| def _extract_color_mask(self, image: np.ndarray, color: str) -> np.ndarray: | |
| """Extract mask for a specific color curve.""" | |
| hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) | |
| if color == 'red': | |
| lower1 = np.array([0, 50, 50]) | |
| upper1 = np.array([10, 255, 255]) | |
| lower2 = np.array([170, 50, 50]) | |
| upper2 = np.array([180, 255, 255]) | |
| mask = cv2.bitwise_or(cv2.inRange(hsv, lower1, upper1), | |
| cv2.inRange(hsv, lower2, upper2)) | |
| elif color == 'green': | |
| lower = np.array([35, 50, 50]) | |
| upper = np.array([85, 255, 255]) | |
| mask = cv2.inRange(hsv, lower, upper) | |
| elif color == 'blue': | |
| lower = np.array([90, 50, 50]) | |
| upper = np.array([130, 255, 255]) | |
| mask = cv2.inRange(hsv, lower, upper) | |
| else: | |
| mask = np.zeros(image.shape[:2], dtype=np.uint8) | |
| return mask | |
| def _analyze_zone_1_image(self, zone_img, red_mask, green_mask, blue_mask): | |
| """Analyze Zone 1 from image.""" | |
| metrics = {} | |
| issues = [] | |
| # Check red curve (travel) progression | |
| red_profile = self._get_vertical_profile(red_mask) | |
| if len(red_profile) > 0: | |
| # Travel should be present and relatively stable/increasing | |
| red_coverage = np.sum(red_mask > 0) / red_mask.size * 100 | |
| metrics['travel_coverage_pct'] = float(red_coverage) | |
| if red_coverage < 5: | |
| issues.append('Low travel signal visibility - possible data quality issue') | |
| # Check blue curve (current) - should be low/baseline | |
| blue_profile = self._get_vertical_profile(blue_mask) | |
| if len(blue_profile) > 0: | |
| blue_coverage = np.sum(blue_mask > 0) / blue_mask.size * 100 | |
| metrics['current_coverage_pct'] = float(blue_coverage) | |
| # Current should start rising towards end | |
| if blue_coverage > 20: | |
| issues.append('High current activity - possible early contact') | |
| health_score = self._calculate_image_health_score(metrics, issues) | |
| return { | |
| 'metrics': metrics, | |
| 'issues': issues, | |
| 'health_score': health_score, | |
| 'health_status': self._get_health_status(health_score) | |
| } | |
| def _analyze_zone_2_image(self, zone_img, red_mask, green_mask, blue_mask): | |
| """Analyze Zone 2 from image - Arcing engagement.""" | |
| metrics = {} | |
| issues = [] | |
| # Check green curve (resistance) for spikes | |
| green_profile = self._get_vertical_profile(green_mask) | |
| if len(green_profile) > 0: | |
| # Detect spikes in resistance | |
| spike_count = self._count_spikes_in_mask(green_mask) | |
| metrics['resistance_spike_count'] = spike_count | |
| green_coverage = np.sum(green_mask > 0) / green_mask.size * 100 | |
| metrics['resistance_coverage_pct'] = float(green_coverage) | |
| # Check for excessive spiking | |
| if spike_count > 10: | |
| issues.append(f'Excessive resistance spikes ({spike_count}) - possible contact damage') | |
| # Check vertical spread (indicates spike height) | |
| vertical_spread = self._get_vertical_spread(green_mask) | |
| metrics['resistance_vertical_spread'] = float(vertical_spread) | |
| if vertical_spread > zone_img.shape[0] * 0.5: | |
| issues.append('Very high resistance spikes - severe arcing') | |
| # Check blue curve (current) activity | |
| blue_coverage = np.sum(blue_mask > 0) / blue_mask.size * 100 | |
| metrics['current_coverage_pct'] = float(blue_coverage) | |
| health_score = self._calculate_image_health_score(metrics, issues) | |
| return { | |
| 'metrics': metrics, | |
| 'issues': issues, | |
| 'health_score': health_score, | |
| 'health_status': self._get_health_status(health_score) | |
| } | |
| def _analyze_zone_3_image(self, zone_img, red_mask, green_mask, blue_mask): | |
| """Analyze Zone 3 from image - Main conduction (most critical).""" | |
| metrics = {} | |
| issues = [] | |
| # Green curve (resistance) should be low and stable | |
| if np.sum(green_mask) > 0: | |
| # Check vertical spread (should be minimal - flat line) | |
| vertical_spread = self._get_vertical_spread(green_mask) | |
| metrics['resistance_vertical_spread'] = float(vertical_spread) | |
| # Calculate stability (lower spread = more stable) | |
| height = zone_img.shape[0] | |
| stability_score = max(0, 100 - (vertical_spread / height * 100)) | |
| metrics['resistance_stability_score'] = float(stability_score) | |
| if vertical_spread > height * 0.15: | |
| issues.append(f'Unstable resistance (spread: {vertical_spread:.0f}px) - poor contact quality') | |
| # Check for oscillations | |
| oscillation_count = self._count_oscillations(green_mask) | |
| metrics['resistance_oscillation_count'] = oscillation_count | |
| if oscillation_count > 5: | |
| issues.append(f'Excessive oscillations ({oscillation_count}) - contact bouncing') | |
| # Check coverage (should be continuous) | |
| green_coverage = np.sum(green_mask > 0) / green_mask.size * 100 | |
| metrics['resistance_coverage_pct'] = float(green_coverage) | |
| if green_coverage < 10: | |
| issues.append('Low resistance signal - possible data extraction issue') | |
| # Red curve (travel) should be stable at plateau | |
| if np.sum(red_mask) > 0: | |
| travel_spread = self._get_vertical_spread(red_mask) | |
| metrics['travel_vertical_spread'] = float(travel_spread) | |
| if travel_spread > height * 0.1: | |
| issues.append('Travel not stable - mechanical issue during conduction') | |
| health_score = self._calculate_image_health_score(metrics, issues) | |
| return { | |
| 'metrics': metrics, | |
| 'issues': issues, | |
| 'health_score': health_score, | |
| 'health_status': self._get_health_status(health_score) | |
| } | |
| def _analyze_zone_4_image(self, zone_img, red_mask, green_mask, blue_mask): | |
| """Analyze Zone 4 from image - Parting.""" | |
| metrics = {} | |
| issues = [] | |
| # Green curve (resistance) should be increasing | |
| if np.sum(green_mask) > 0: | |
| # Check for upward trend | |
| green_profile = self._get_vertical_profile(green_mask) | |
| if len(green_profile) > 2: | |
| # Compare left vs right side vertical positions | |
| left_avg = np.mean(green_profile[:len(green_profile)//3]) | |
| right_avg = np.mean(green_profile[-len(green_profile)//3:]) | |
| # Lower pixel value = higher on graph | |
| if left_avg < right_avg: | |
| metrics['resistance_trend'] = 'decreasing' | |
| issues.append('Resistance decreasing during parting - abnormal behavior') | |
| else: | |
| metrics['resistance_trend'] = 'increasing' | |
| # Check for parting spikes | |
| spike_count = self._count_spikes_in_mask(green_mask) | |
| metrics['parting_spike_count'] = spike_count | |
| vertical_spread = self._get_vertical_spread(green_mask) | |
| metrics['resistance_vertical_spread'] = float(vertical_spread) | |
| if spike_count > 15: | |
| issues.append(f'Excessive parting spikes ({spike_count}) - severe arcing') | |
| # Red curve (travel) should be decreasing (opening) | |
| if np.sum(red_mask) > 0: | |
| red_profile = self._get_vertical_profile(red_mask) | |
| if len(red_profile) > 2: | |
| left_avg = np.mean(red_profile[:len(red_profile)//3]) | |
| right_avg = np.mean(red_profile[-len(red_profile)//3:]) | |
| # Higher pixel value = lower on graph (opening) | |
| if left_avg > right_avg: | |
| issues.append('Travel not decreasing - mechanical opening issue') | |
| health_score = self._calculate_image_health_score(metrics, issues) | |
| return { | |
| 'metrics': metrics, | |
| 'issues': issues, | |
| 'health_score': health_score, | |
| 'health_status': self._get_health_status(health_score) | |
| } | |
| def _analyze_zone_5_image(self, zone_img, red_mask, green_mask, blue_mask): | |
| """Analyze Zone 5 from image - Final open state.""" | |
| metrics = {} | |
| issues = [] | |
| # Green curve (resistance) should be high and stable | |
| if np.sum(green_mask) > 0: | |
| vertical_spread = self._get_vertical_spread(green_mask) | |
| metrics['resistance_vertical_spread'] = float(vertical_spread) | |
| if vertical_spread > zone_img.shape[0] * 0.1: | |
| issues.append('Unstable final resistance - incomplete opening') | |
| green_coverage = np.sum(green_mask > 0) / green_mask.size * 100 | |
| metrics['resistance_coverage_pct'] = float(green_coverage) | |
| # Blue curve (current) should be minimal | |
| if np.sum(blue_mask) > 0: | |
| blue_coverage = np.sum(blue_mask > 0) / blue_mask.size * 100 | |
| metrics['current_coverage_pct'] = float(blue_coverage) | |
| if blue_coverage > 10: | |
| issues.append('Elevated current in open state - possible leakage') | |
| health_score = self._calculate_image_health_score(metrics, issues) | |
| return { | |
| 'metrics': metrics, | |
| 'issues': issues, | |
| 'health_score': health_score, | |
| 'health_status': self._get_health_status(health_score) | |
| } | |
| def _get_vertical_profile(self, mask: np.ndarray) -> np.ndarray: | |
| """Get vertical position profile across horizontal axis.""" | |
| profile = [] | |
| for x in range(mask.shape[1]): | |
| col = mask[:, x] | |
| if np.sum(col) > 0: | |
| # Get center of mass in this column | |
| indices = np.where(col > 0)[0] | |
| center = np.mean(indices) | |
| profile.append(center) | |
| return np.array(profile) | |
| def _get_vertical_spread(self, mask: np.ndarray) -> float: | |
| """Calculate vertical spread of a mask (height of signal).""" | |
| if np.sum(mask) == 0: | |
| return 0.0 | |
| # Find min and max y coordinates where mask is active | |
| y_coords = np.where(mask > 0)[0] | |
| if len(y_coords) == 0: | |
| return 0.0 | |
| return float(np.max(y_coords) - np.min(y_coords)) | |
| def _count_spikes_in_mask(self, mask: np.ndarray) -> int: | |
| """Count number of spikes in a mask.""" | |
| profile = self._get_vertical_profile(mask) | |
| if len(profile) < 3: | |
| return 0 | |
| # Detect peaks | |
| spike_count = 0 | |
| for i in range(1, len(profile) - 1): | |
| # Peak if lower than neighbors (remember: lower y = higher on graph) | |
| if profile[i] < profile[i-1] and profile[i] < profile[i+1]: | |
| # Check if significant | |
| if abs(profile[i] - profile[i-1]) > 5 or abs(profile[i] - profile[i+1]) > 5: | |
| spike_count += 1 | |
| return spike_count | |
| def _count_oscillations(self, mask: np.ndarray) -> int: | |
| """Count oscillations in the signal.""" | |
| profile = self._get_vertical_profile(mask) | |
| if len(profile) < 5: | |
| return 0 | |
| # Simple moving average smoothing (no scipy needed) | |
| window_size = min(5, len(profile) // 3) | |
| if window_size < 2: | |
| smoothed = profile | |
| else: | |
| smoothed = np.convolve(profile, np.ones(window_size)/window_size, mode='same') | |
| # Count direction changes | |
| oscillations = 0 | |
| direction = 0 # 0: none, 1: up, -1: down | |
| for i in range(1, len(smoothed)): | |
| diff = smoothed[i] - smoothed[i-1] | |
| if abs(diff) > 2: # Threshold for significant change | |
| new_direction = 1 if diff > 0 else -1 | |
| if direction != 0 and new_direction != direction: | |
| oscillations += 1 | |
| direction = new_direction | |
| return oscillations | |
| def _calculate_image_health_score(self, metrics: Dict, issues: List[str]) -> float: | |
| """Calculate health score based on image analysis.""" | |
| score = 100.0 | |
| # Deduct for issues | |
| score -= len(issues) * 15 | |
| # Additional deductions based on metrics | |
| if 'resistance_vertical_spread' in metrics: | |
| spread = metrics['resistance_vertical_spread'] | |
| if spread > 100: | |
| score -= 20 | |
| elif spread > 50: | |
| score -= 10 | |
| if 'resistance_spike_count' in metrics: | |
| spikes = metrics['resistance_spike_count'] | |
| if spikes > 15: | |
| score -= 25 | |
| elif spikes > 10: | |
| score -= 15 | |
| return max(0.0, min(100.0, score)) | |
| def _get_health_status(self, score: float) -> str: | |
| """Convert health score to status label.""" | |
| if score >= 85: | |
| return 'Excellent' | |
| elif score >= 70: | |
| return 'Good' | |
| elif score >= 50: | |
| return 'Fair' | |
| elif score >= 30: | |
| return 'Poor' | |
| else: | |
| return 'Critical' | |
| def _calculate_overall_health(self) -> Dict[str, Any]: | |
| """Calculate overall health assessment.""" | |
| if not self.analysis_results: | |
| return {'status': 'No data', 'score': 0.0} | |
| zone_scores = [] | |
| all_issues = [] | |
| for zone_name, analysis in self.analysis_results.items(): | |
| if isinstance(analysis, dict) and 'health_score' in analysis: | |
| zone_scores.append(analysis['health_score']) | |
| all_issues.extend(analysis.get('issues', [])) | |
| if not zone_scores: | |
| return {'status': 'Unknown', 'score': 0.0} | |
| # Weighted average | |
| weights = { | |
| 'zone_1_pre_contact': 0.15, | |
| 'zone_2_arcing_engagement': 0.20, | |
| 'zone_3_main_conduction': 0.35, | |
| 'zone_4_parting': 0.20, | |
| 'zone_5_final_open': 0.10 | |
| } | |
| weighted_score = 0.0 | |
| total_weight = 0.0 | |
| for zone_name, analysis in self.analysis_results.items(): | |
| if isinstance(analysis, dict) and 'health_score' in analysis: | |
| weight = weights.get(zone_name, 0.2) | |
| weighted_score += analysis['health_score'] * weight | |
| total_weight += weight | |
| overall_score = weighted_score / total_weight if total_weight > 0 else 0.0 | |
| return { | |
| 'overall_score': round(overall_score, 2), | |
| 'status': self._get_health_status(overall_score), | |
| 'total_issues': len(all_issues), | |
| 'critical_issues': [issue for issue in all_issues if 'severe' in issue.lower() or 'critical' in issue.lower()], | |
| 'recommendation': self._generate_recommendation(overall_score, all_issues) | |
| } | |
| def _generate_recommendation(self, score: float, issues: List[str]) -> str: | |
| """Generate maintenance recommendation.""" | |
| if score >= 85: | |
| return 'Circuit breaker is in excellent condition. Continue regular monitoring.' | |
| elif score >= 70: | |
| return 'Circuit breaker is in good condition. Schedule routine maintenance as planned.' | |
| elif score >= 50: | |
| return 'Circuit breaker shows signs of wear. Increase monitoring frequency and plan maintenance.' | |
| elif score >= 30: | |
| return 'Circuit breaker condition is poor. Schedule maintenance soon to prevent failure.' | |
| else: | |
| return 'CRITICAL: Circuit breaker requires immediate attention. Risk of failure is high.' | |