Spaces:
Sleeping
Sleeping
| """Synthetic bearing-fault vibration simulator for FactoryFlow. | |
| Generates 512-sample time-domain windows at 10 kHz that mimic the vibration | |
| signature of a 6205 ball bearing. State transitions between healthy and | |
| imminent_failure inject a growing sinusoid at the BPFO frequency (~85 Hz at | |
| 1800 RPM), which MOMENT later flags as a reconstruction anomaly. | |
| """ | |
| from __future__ import annotations | |
| import math | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timezone | |
| from typing import Literal | |
| import numpy as np | |
| import structlog | |
| log = structlog.get_logger() | |
| WINDOW_SIZE: int = 512 | |
| SAMPLE_RATE_HZ: float = 10_000.0 | |
| BEARING_FAULT_FREQ_HZ: float = 85.0 | |
| DEGRADATION_RAMP_PER_TICK: float = 0.01 | |
| State = Literal["normal", "degrading", "imminent_failure"] | |
| STATE_PROFILES: dict[str, dict[str, float]] = { | |
| "normal": {"degradation_floor": 0.05, "noise_scale": 1.0}, | |
| "degrading": {"degradation_floor": 0.0, "noise_scale": 1.2}, | |
| "imminent_failure": {"degradation_floor": 0.92, "noise_scale": 1.5}, | |
| } | |
| class SensorWindow: | |
| timestamp: str | |
| state_label: str | |
| fft_window: list[float] | |
| dominant_freq_hz: float | |
| rms_velocity: float | |
| def to_dict(self) -> dict: | |
| return { | |
| "timestamp": self.timestamp, | |
| "state_label": self.state_label, | |
| "fft_window": self.fft_window, | |
| "dominant_freq_hz": self.dominant_freq_hz, | |
| "rms_velocity": self.rms_velocity, | |
| } | |
| class BearingFaultSimulator: | |
| state: State = "normal" | |
| degradation_level: float = 0.05 | |
| sample_rate_hz: float = SAMPLE_RATE_HZ | |
| window_size: int = WINDOW_SIZE | |
| fault_freq_hz: float = BEARING_FAULT_FREQ_HZ | |
| _rng: np.random.Generator = field( | |
| default_factory=lambda: np.random.default_rng(seed=42) | |
| ) | |
| _tick: int = 0 | |
| def set_state(self, state: str) -> None: | |
| if state not in STATE_PROFILES: | |
| raise ValueError( | |
| f"unknown state '{state}'; must be one of {list(STATE_PROFILES)}" | |
| ) | |
| previous = self.state | |
| self.state = state # type: ignore[assignment] | |
| floor = STATE_PROFILES[state]["degradation_floor"] | |
| self.degradation_level = max(self.degradation_level, floor) | |
| if state == "normal": | |
| self.degradation_level = floor | |
| log.info( | |
| "state_change", | |
| component="sensor.simulator", | |
| previous=previous, | |
| new=state, | |
| degradation_level=round(self.degradation_level, 3), | |
| ) | |
| def _advance_degradation(self) -> None: | |
| if self.state == "degrading": | |
| self.degradation_level = min( | |
| 0.85, self.degradation_level + DEGRADATION_RAMP_PER_TICK | |
| ) | |
| elif self.state == "imminent_failure": | |
| self.degradation_level = min(0.98, self.degradation_level + 0.005) | |
| # normal: leave at floor | |
| def inject_fault_peak( | |
| self, signal: np.ndarray, freq_hz: float, amplitude: float | |
| ) -> np.ndarray: | |
| t = np.arange(signal.size, dtype=np.float64) / self.sample_rate_hz | |
| phase = self._rng.uniform(0.0, 2 * math.pi) | |
| # Add fundamental + 2x harmonic — bearing faults excite harmonics too. | |
| peak = amplitude * np.sin(2 * math.pi * freq_hz * t + phase) | |
| peak += 0.4 * amplitude * np.sin(2 * math.pi * 2 * freq_hz * t + phase) | |
| return signal + peak | |
| def generate_window(self) -> SensorWindow: | |
| self._advance_degradation() | |
| self._tick += 1 | |
| profile = STATE_PROFILES[self.state] | |
| noise_scale = profile["noise_scale"] | |
| # Broadband mechanical noise (healthy bearing baseline). | |
| signal = self._rng.normal(0.0, 0.05 * noise_scale, size=self.window_size) | |
| # Always include a small running-machine 30 Hz shaft component. | |
| t = np.arange(self.window_size, dtype=np.float64) / self.sample_rate_hz | |
| signal += 0.08 * np.sin(2 * math.pi * 30.0 * t) | |
| # Inject the fault peak scaled by current degradation. | |
| fault_amplitude = 0.6 * self.degradation_level | |
| if fault_amplitude > 0.01: | |
| signal = self.inject_fault_peak( | |
| signal, self.fault_freq_hz, fault_amplitude | |
| ) | |
| # Impulsive transients spike during imminent failure. | |
| if self.state == "imminent_failure" and self._rng.random() < 0.5: | |
| idx = int(self._rng.integers(0, self.window_size)) | |
| signal[idx] += self._rng.choice([-1.0, 1.0]) * 0.7 | |
| dominant_hz, rms = _spectral_stats(signal, self.sample_rate_hz) | |
| window = SensorWindow( | |
| timestamp=datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), | |
| state_label=self.state, | |
| fft_window=[float(x) for x in signal], | |
| dominant_freq_hz=float(dominant_hz), | |
| rms_velocity=float(rms), | |
| ) | |
| log.debug( | |
| "window_emitted", | |
| component="sensor.simulator", | |
| tick=self._tick, | |
| state=self.state, | |
| degradation=round(self.degradation_level, 3), | |
| dominant_hz=round(dominant_hz, 1), | |
| rms=round(rms, 3), | |
| ) | |
| return window | |
| def _spectral_stats(signal: np.ndarray, sample_rate_hz: float) -> tuple[float, float]: | |
| spectrum = np.abs(np.fft.rfft(signal)) | |
| freqs = np.fft.rfftfreq(signal.size, d=1.0 / sample_rate_hz) | |
| # Ignore DC component when picking dominant frequency. | |
| if spectrum.size > 1: | |
| dominant_hz = float(freqs[1 + int(np.argmax(spectrum[1:]))]) | |
| else: | |
| dominant_hz = 0.0 | |
| rms = float(np.sqrt(np.mean(signal**2))) | |
| return dominant_hz, rms | |