Spaces:
Sleeping
Sleeping
File size: 5,659 Bytes
8ad9950 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | """Synthetic bearing-fault vibration simulator for FactoryFlow.
Generates 512-sample time-domain windows at 10 kHz that mimic the vibration
signature of a 6205 ball bearing. State transitions between healthy and
imminent_failure inject a growing sinusoid at the BPFO frequency (~85 Hz at
1800 RPM), which MOMENT later flags as a reconstruction anomaly.
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Literal
import numpy as np
import structlog
log = structlog.get_logger()
WINDOW_SIZE: int = 512
SAMPLE_RATE_HZ: float = 10_000.0
BEARING_FAULT_FREQ_HZ: float = 85.0
DEGRADATION_RAMP_PER_TICK: float = 0.01
State = Literal["normal", "degrading", "imminent_failure"]
STATE_PROFILES: dict[str, dict[str, float]] = {
"normal": {"degradation_floor": 0.05, "noise_scale": 1.0},
"degrading": {"degradation_floor": 0.0, "noise_scale": 1.2},
"imminent_failure": {"degradation_floor": 0.92, "noise_scale": 1.5},
}
@dataclass
class SensorWindow:
timestamp: str
state_label: str
fft_window: list[float]
dominant_freq_hz: float
rms_velocity: float
def to_dict(self) -> dict:
return {
"timestamp": self.timestamp,
"state_label": self.state_label,
"fft_window": self.fft_window,
"dominant_freq_hz": self.dominant_freq_hz,
"rms_velocity": self.rms_velocity,
}
@dataclass
class BearingFaultSimulator:
state: State = "normal"
degradation_level: float = 0.05
sample_rate_hz: float = SAMPLE_RATE_HZ
window_size: int = WINDOW_SIZE
fault_freq_hz: float = BEARING_FAULT_FREQ_HZ
_rng: np.random.Generator = field(
default_factory=lambda: np.random.default_rng(seed=42)
)
_tick: int = 0
def set_state(self, state: str) -> None:
if state not in STATE_PROFILES:
raise ValueError(
f"unknown state '{state}'; must be one of {list(STATE_PROFILES)}"
)
previous = self.state
self.state = state # type: ignore[assignment]
floor = STATE_PROFILES[state]["degradation_floor"]
self.degradation_level = max(self.degradation_level, floor)
if state == "normal":
self.degradation_level = floor
log.info(
"state_change",
component="sensor.simulator",
previous=previous,
new=state,
degradation_level=round(self.degradation_level, 3),
)
def _advance_degradation(self) -> None:
if self.state == "degrading":
self.degradation_level = min(
0.85, self.degradation_level + DEGRADATION_RAMP_PER_TICK
)
elif self.state == "imminent_failure":
self.degradation_level = min(0.98, self.degradation_level + 0.005)
# normal: leave at floor
def inject_fault_peak(
self, signal: np.ndarray, freq_hz: float, amplitude: float
) -> np.ndarray:
t = np.arange(signal.size, dtype=np.float64) / self.sample_rate_hz
phase = self._rng.uniform(0.0, 2 * math.pi)
# Add fundamental + 2x harmonic — bearing faults excite harmonics too.
peak = amplitude * np.sin(2 * math.pi * freq_hz * t + phase)
peak += 0.4 * amplitude * np.sin(2 * math.pi * 2 * freq_hz * t + phase)
return signal + peak
def generate_window(self) -> SensorWindow:
self._advance_degradation()
self._tick += 1
profile = STATE_PROFILES[self.state]
noise_scale = profile["noise_scale"]
# Broadband mechanical noise (healthy bearing baseline).
signal = self._rng.normal(0.0, 0.05 * noise_scale, size=self.window_size)
# Always include a small running-machine 30 Hz shaft component.
t = np.arange(self.window_size, dtype=np.float64) / self.sample_rate_hz
signal += 0.08 * np.sin(2 * math.pi * 30.0 * t)
# Inject the fault peak scaled by current degradation.
fault_amplitude = 0.6 * self.degradation_level
if fault_amplitude > 0.01:
signal = self.inject_fault_peak(
signal, self.fault_freq_hz, fault_amplitude
)
# Impulsive transients spike during imminent failure.
if self.state == "imminent_failure" and self._rng.random() < 0.5:
idx = int(self._rng.integers(0, self.window_size))
signal[idx] += self._rng.choice([-1.0, 1.0]) * 0.7
dominant_hz, rms = _spectral_stats(signal, self.sample_rate_hz)
window = SensorWindow(
timestamp=datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
state_label=self.state,
fft_window=[float(x) for x in signal],
dominant_freq_hz=float(dominant_hz),
rms_velocity=float(rms),
)
log.debug(
"window_emitted",
component="sensor.simulator",
tick=self._tick,
state=self.state,
degradation=round(self.degradation_level, 3),
dominant_hz=round(dominant_hz, 1),
rms=round(rms, 3),
)
return window
def _spectral_stats(signal: np.ndarray, sample_rate_hz: float) -> tuple[float, float]:
spectrum = np.abs(np.fft.rfft(signal))
freqs = np.fft.rfftfreq(signal.size, d=1.0 / sample_rate_hz)
# Ignore DC component when picking dominant frequency.
if spectrum.size > 1:
dominant_hz = float(freqs[1 + int(np.argmax(spectrum[1:]))])
else:
dominant_hz = 0.0
rms = float(np.sqrt(np.mean(signal**2)))
return dominant_hz, rms
|