FLASK_APP / core /engines /advanced_rules.py
pranit144's picture
Upload 97 files
e38de99 verified
# Previous Name: analysis/engines/advanced_rule_engine.py
import numpy as np
import pandas as pd
import json
import sys
from scipy.signal import savgol_filter, find_peaks, welch
from scipy.stats import skew, kurtosis
from scipy.integrate import simpson
# Set UTF-8 encoding for console output
if sys.stdout.encoding != 'utf-8':
sys.stdout.reconfigure(encoding='utf-8')
class AdvancedDCRMEngine:
"""
Top-Notch Advanced Rule-Based DCRM Engine
=========================================
Combines Physics-Based Signal Processing (Scipy) with Expert Heuristic Logic.
Features:
- 12-Class Defect Detection (Primary + Secondary)
- Evidence-Based Scoring (0-100% Confidence)
- Advanced Signal Processing:
* Savitzky-Golay Filtering (Noise reduction without edge blurring)
* FFT (Mechanical Chatter detection)
* Peak Finding (Spike counting, Bounce detection)
* Derivative Analysis (Jerk/Stutter detection)
* Energy Integration (Arcing Ablation)
"""
def __init__(self):
# --- CONFIGURATION & THRESHOLDS ---
# 1. Signal Processing Config
self.SAVGOL_WINDOW = 11 # Window length for smoothing (must be odd)
self.SAVGOL_POLYORDER = 2 # Polynomial order
# 2. Physics Thresholds (Strict - Industry Standard)
self.R_OPEN_THRESHOLD = 1_000_000 # µΩ
self.R_MAIN_MAX_HEALTHY = 50.0 # µΩ
self.R_MAIN_WARNING = 80.0 # µΩ
self.R_MAIN_CRITICAL = 150.0 # µΩ
# 3. Wear & Arcing Thresholds
self.ARCING_SPIKE_CRITICAL = 8000 # µΩ
self.ARCING_SPIKE_SEVERE = 5000 # µΩ
self.ARCING_ENERGY_CRITICAL = 2000.0 # Joules
# 4. Mechanical Thresholds
self.BOUNCE_PROMINENCE = 500 # µΩ
self.JERK_THRESHOLD = 500.0 # µΩ/ms^2
self.FFT_CHATTER_POWER_THRESHOLD = 100.0
def analyze(self, df: pd.DataFrame, segments: dict, kpis: dict = None) -> dict:
"""
Main entry point for analysis.
Args:
df: DataFrame with 'Resistance' column.
segments: Dictionary containing phase start/end indices.
Expected keys: 'phase2_start', 'phase2_end', 'phase3_start',
'phase3_end', 'phase4_start', 'phase4_end'.
kpis: Optional dictionary of Key Performance Indicators.
"""
if kpis is None: kpis = {}
# 1. Standardize Input
df_std = self._standardize_input(df)
resistance = df_std['Resistance'].values
# 2. Signal Preprocessing (Scipy)
try:
resistance_smooth = savgol_filter(resistance, self.SAVGOL_WINDOW, self.SAVGOL_POLYORDER)
except Exception:
resistance_smooth = resistance
# Derivatives
res_velocity = np.gradient(resistance_smooth)
res_acceleration = np.gradient(res_velocity)
# 3. Advanced Feature Extraction (Using provided segments)
features = self._extract_advanced_features(
resistance, resistance_smooth, res_velocity, res_acceleration, segments, kpis
)
# 4. Heuristic Defect Detection (12 Classes)
defects = self._detect_defects(features, kpis)
# 5. Construct Report
report = self._build_report(features, defects)
return report
def _standardize_input(self, df: pd.DataFrame) -> pd.DataFrame:
if 'Resistance' not in df.columns:
cols = [c for c in df.columns if 'res' in c.lower() or 'uohm' in c.lower()]
if cols: df = df.rename(columns={cols[0]: 'Resistance'})
else:
non_time = [c for c in df.columns if not c.lower().startswith('t')]
if non_time: df = df.rename(columns={non_time[0]: 'Resistance'})
else: raise ValueError("Could not identify Resistance column")
if df.shape[1] > 100 and df.shape[0] < 10:
vals = df.iloc[0].values
return pd.DataFrame({'Resistance': vals})
return df[['Resistance']].reset_index(drop=True)
def _extract_advanced_features(self, r_raw, r_smooth, r_vel, r_acc, seg, kpis):
"""
Extracts comprehensive features for heuristic scoring.
"""
features = {}
if not seg['valid']:
features['valid_data'] = False
return features
features['valid_data'] = True
# --- A. MAIN CONTACT (Phase 3) ---
p3_slice = slice(seg['phase3_start'], seg['phase3_end'])
r_p3 = r_raw[p3_slice]
if len(r_p3) > 0:
features['main_mean'] = float(np.mean(r_p3))
features['main_std'] = float(np.std(r_p3))
features['main_min'] = float(np.min(r_p3))
features['main_max'] = float(np.max(r_p3))
features['main_range'] = float(features['main_max'] - features['main_min'])
# Detrend for roughness analysis
x = np.arange(len(r_p3))
p = np.polyfit(x, r_p3, 1)
detrended = r_p3 - np.polyval(p, x)
features['roughness_rms'] = float(np.sqrt(np.mean(detrended**2)))
features['roughness_skew'] = float(skew(detrended)) if len(detrended) > 2 else 0
features['roughness_kurtosis'] = float(kurtosis(detrended)) if len(detrended) > 2 else 0
# FFT for Chatter (50-300Hz)
if len(detrended) > 32:
freqs, psd = welch(detrended, fs=1000)
chatter_band = (freqs >= 50) & (freqs <= 300)
features['chatter_power'] = float(simpson(psd[chatter_band], freqs[chatter_band]))
else:
features['chatter_power'] = 0.0
# Telegraph Noise (Square Jumps)
diffs = np.abs(np.diff(r_p3))
features['telegraph_jumps'] = int(np.sum(diffs > 120)) # Jump > 120uOhm
# Shelf Detection (Histogram)
hist, _ = np.histogram(r_p3, bins=10)
features['num_shelves'] = int(np.sum(hist > len(r_p3)*0.1)) # Bins with >10% data
else:
features.update({'main_mean': 9999, 'roughness_rms': 0, 'chatter_power': 0, 'telegraph_jumps': 0})
# --- B. ARCING ZONES (Phase 2 & 4) ---
test_current = kpis.get('Test Current (A)', 100.0)
# Closing Arc
p2_slice = slice(seg['phase2_start'], seg['phase2_end'])
r_p2 = r_raw[p2_slice]
features['closing_arc_duration'] = len(r_p2)
if len(r_p2) > 0:
power_p2 = (test_current ** 2) * r_p2 * 1e-6
features['closing_arc_energy_joules'] = float(np.sum(power_p2) * 0.001)
features['closing_critical_spikes'] = int(np.sum(r_p2 > self.ARCING_SPIKE_CRITICAL))
features['closing_severe_spikes'] = int(np.sum(r_p2 > self.ARCING_SPIKE_SEVERE))
else:
features.update({'closing_arc_energy_joules': 0, 'closing_critical_spikes': 0, 'closing_severe_spikes': 0})
# Opening Arc
p4_slice = slice(seg['phase4_start'], seg['phase4_end'])
r_p4 = r_raw[p4_slice]
features['opening_arc_duration'] = len(r_p4)
if len(r_p4) > 0:
power_p4 = (test_current ** 2) * r_p4 * 1e-6
features['opening_arc_energy_joules'] = float(np.sum(power_p4) * 0.001)
features['opening_critical_spikes'] = int(np.sum(r_p4 > self.ARCING_SPIKE_CRITICAL))
features['opening_severe_spikes'] = int(np.sum(r_p4 > self.ARCING_SPIKE_SEVERE))
# Bounce Detection (Find Peaks)
peaks, _ = find_peaks(r_p4, prominence=self.BOUNCE_PROMINENCE, distance=5)
features['num_bounces'] = len(peaks)
# Telegraph in Arcing
features['arcing_telegraph'] = int(np.sum(np.abs(np.diff(r_p4)) > 400))
else:
features.update({'opening_arc_energy_joules': 0, 'opening_critical_spikes': 0, 'opening_severe_spikes': 0, 'num_bounces': 0, 'arcing_telegraph': 0})
# --- C. KINEMATICS ---
features['dur_closing'] = len(r_p2)
features['dur_opening'] = len(r_p4)
features['asymmetry_ratio'] = float(features['dur_opening'] / max(1, features['dur_closing']))
acc_p3 = r_acc[p3_slice] if len(r_p3) > 0 else []
features['max_micro_jerk'] = float(np.max(np.abs(acc_p3))) if len(acc_p3) > 0 else 0.0
return features
def _detect_defects(self, f, kpis):
"""
Applies Heuristic Scoring Logic for 12 Defect Classes.
Returns list of defects with 'Confidence' and 'Evidence'.
"""
defects = []
if not f['valid_data']: return defects
# --- CLASS 1: HEALTHY (Implicit - if no defects found) ---
# --- CLASS 2: MAIN CONTACT WEAR ---
score = 0
evidence = []
if f['main_mean'] > self.R_MAIN_CRITICAL:
score += 50; evidence.append(f"Critical Resistance ({f['main_mean']:.1f} µΩ)")
elif f['main_mean'] > self.R_MAIN_WARNING:
score += 30; evidence.append(f"Elevated Resistance ({f['main_mean']:.1f} µΩ)")
if f['roughness_rms'] > 25:
score += 25; evidence.append(f"Severe Surface Roughness (RMS {f['roughness_rms']:.1f})")
elif f['roughness_rms'] > 15:
score += 15; evidence.append(f"Moderate Roughness (RMS {f['roughness_rms']:.1f})")
if f['roughness_skew'] > 1.5:
score += 10; evidence.append("Positive Skew indicates pitting")
if score > 40:
defects.append(self._make_defect("Main Contact Wear", score, evidence))
# --- CLASS 3: ARCING CONTACT WEAR ---
score = 0
evidence = []
total_spikes = f['closing_critical_spikes'] + f['opening_critical_spikes']
total_energy = f['closing_arc_energy_joules'] + f['opening_arc_energy_joules']
if total_spikes >= 4:
score += 50; evidence.append(f"{total_spikes} Critical Arc Flashes (>8000µΩ)")
elif f['closing_severe_spikes'] + f['opening_severe_spikes'] >= 3:
score += 40; evidence.append("Multiple Severe Spikes (>5000µΩ)")
if total_energy > self.ARCING_ENERGY_CRITICAL:
score += 30; evidence.append(f"Critical Arc Energy ({total_energy:.1f} J)")
if score > 40:
defects.append(self._make_defect("Arcing Contact Wear", score, evidence))
# --- CLASS 4: MAIN CONTACT MISALIGNMENT ---
score = 0
evidence = []
if f['telegraph_jumps'] >= 6:
score += 45; evidence.append(f"Telegraph Pattern: {f['telegraph_jumps']} square jumps")
elif f['telegraph_jumps'] >= 3:
score += 25; evidence.append(f"Partial Telegraph: {f['telegraph_jumps']} jumps")
if f['num_shelves'] >= 3:
score += 20; evidence.append(f"Stepped Shelves: {f['num_shelves']} plateaus")
if f['main_std'] > 70:
score += 15; evidence.append(f"High Instability (Std {f['main_std']:.1f})")
if score > 40:
defects.append(self._make_defect("Main Contact Misalignment", score, evidence))
# --- CLASS 5: ARCING CONTACT MISALIGNMENT ---
score = 0
evidence = []
if f['asymmetry_ratio'] > 2.2:
score += 35; evidence.append(f"Severe Asymmetry (Opening {f['asymmetry_ratio']:.1f}x Closing)")
elif f['asymmetry_ratio'] > 1.6:
score += 20; evidence.append(f"Moderate Asymmetry ({f['asymmetry_ratio']:.1f}x)")
if f['num_bounces'] >= 5:
score += 30; evidence.append(f"Mechanical Oscillation: {f['num_bounces']} bounces")
elif f['num_bounces'] >= 3:
score += 15; evidence.append(f"{f['num_bounces']} bounces detected")
if f['arcing_telegraph'] > 10:
score += 15; evidence.append("High-freq telegraph in arcing zone")
if score > 40:
defects.append(self._make_defect("Arcing Contact Misalignment", score, evidence))
# --- CLASS 6: OPERATING MECHANISM (Timing) ---
# Requires KPIs
score = 0
evidence = []
c_time = kpis.get('Closing Time (ms)')
o_time = kpis.get('Opening Time (ms)')
if c_time and (c_time > 120 or c_time < 64):
score += 40; evidence.append(f"Closing Time Deviation ({c_time}ms)")
if o_time and (o_time > 48 or o_time < 24):
score += 40; evidence.append(f"Opening Time Deviation ({o_time}ms)")
if score > 40:
defects.append(self._make_defect("Operating Mechanism Malfunction", score, evidence))
# --- CLASS 7: DAMPING SYSTEM FAULT ---
score = 0
evidence = []
if f['num_bounces'] > 7:
score += 80; evidence.append(f"Excessive Bouncing ({f['num_bounces']} peaks) - Damper failure")
elif f['num_bounces'] > 5:
score += 50; evidence.append(f"High Bouncing ({f['num_bounces']} peaks)")
if score > 40:
defects.append(self._make_defect("Damping System Fault", score, evidence))
# --- CLASS 9: LINKAGE/ROD OBSTRUCTION ---
score = 0
evidence = []
if f['max_micro_jerk'] > self.JERK_THRESHOLD:
score += 60; evidence.append(f"High Kinematic Jerk ({f['max_micro_jerk']:.1f}) - Stick-slip friction")
if score > 40:
defects.append(self._make_defect("Linkage/Rod Obstruction", score, evidence))
# --- CLASS 10: FIXED CONTACT DAMAGE ---
score = 0
evidence = []
dlro = kpis.get('DLRO Value (µΩ)')
if dlro and dlro > 80 and f['roughness_rms'] < 15:
# High resistance but smooth curve = Fixed contact issue
score += 85; evidence.append(f"High DLRO ({dlro}µΩ) with Smooth Curve (Fixed Contact)")
if score > 40:
defects.append(self._make_defect("Fixed Contact Damage", score, evidence))
# --- CLASS 11/12: COIL DAMAGE ---
# Simple threshold checks
cc = kpis.get('Peak Close Coil Current (A)')
if cc and cc < 2.0:
defects.append(self._make_defect("Close Coil Damage", 95, [f"Current {cc}A < 2A"]))
tc1 = kpis.get('Peak Trip Coil 1 Current (A)')
tc2 = kpis.get('Peak Trip Coil 2 Current (A)')
if tc1 and tc2 and tc1 < 2.0 and tc2 < 2.0:
defects.append(self._make_defect("Trip Coil Damage", 95, [f"Both Coils Failed (TC1:{tc1}A, TC2:{tc2}A)"]))
return defects
def _make_defect(self, name, score, evidence):
return {
"defect_name": name,
"Confidence": f"{min(99.9, score):.1f} %",
"Severity": "High" if score > 70 else "Medium",
"description": "; ".join(evidence)
}
def _build_report(self, features, defects):
"""Constructs the final JSON report."""
# Calculate Overall Health Score
health_score = 100
for d in defects:
sev = d['Severity']
if sev == 'High': health_score -= 30
elif sev == 'Medium': health_score -= 15
health_score = max(0, health_score)
status = "Healthy"
if health_score < 50: status = "Critical"
elif health_score < 80: status = "Warning"
# Sort defects by confidence
defects.sort(key=lambda x: float(x['Confidence'].replace('%','')), reverse=True)
return {
"Fault_Detection": defects,
"advanced_analysis": {
"health_score": health_score,
"status": status,
"physics_metrics": {
"main_contact_resistance_uohm": round(features.get('main_mean', 0), 2),
"surface_roughness_rms": round(features.get('roughness_rms', 0), 2),
"arc_energy_joules": round(features.get('closing_arc_energy_joules', 0) + features.get('opening_arc_energy_joules', 0), 2),
"mechanical_chatter_power": round(features.get('chatter_power', 0), 2),
"kinematic_jerk_index": round(features.get('max_micro_jerk', 0), 2),
"telegraph_jumps": features.get('telegraph_jumps', 0),
"bounces": features.get('num_bounces', 0)
}
}
}
if __name__ == "__main__":
# Test Block
t = np.linspace(0, 400, 401)
r = np.ones_like(t) * 100000
r[100:300] = 40 + np.random.normal(0, 2, 200)
# Add synthetic defects
r[150:200] += 10 * np.sin(2 * np.pi * 60 * t[150:200]/1000) # Chatter
r[305:315] = 4000 # Bounce
df = pd.DataFrame({'T_ms': t, 'Resistance': r})
engine = AdvancedDCRMEngine()
report = engine.analyze(df)
print(json.dumps(report, indent=2))