factoryflow / src /sensor /simulator.py
oabolade23's picture
FactoryFlow demo — initial submission
8ad9950
Raw
History Blame Contribute Delete
5.66 kB
"""Synthetic bearing-fault vibration simulator for FactoryFlow.
Generates 512-sample time-domain windows at 10 kHz that mimic the vibration
signature of a 6205 ball bearing. State transitions between healthy and
imminent_failure inject a growing sinusoid at the BPFO frequency (~85 Hz at
1800 RPM), which MOMENT later flags as a reconstruction anomaly.
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Literal
import numpy as np
import structlog
log = structlog.get_logger()
WINDOW_SIZE: int = 512
SAMPLE_RATE_HZ: float = 10_000.0
BEARING_FAULT_FREQ_HZ: float = 85.0
DEGRADATION_RAMP_PER_TICK: float = 0.01
State = Literal["normal", "degrading", "imminent_failure"]
STATE_PROFILES: dict[str, dict[str, float]] = {
"normal": {"degradation_floor": 0.05, "noise_scale": 1.0},
"degrading": {"degradation_floor": 0.0, "noise_scale": 1.2},
"imminent_failure": {"degradation_floor": 0.92, "noise_scale": 1.5},
}
@dataclass
class SensorWindow:
timestamp: str
state_label: str
fft_window: list[float]
dominant_freq_hz: float
rms_velocity: float
def to_dict(self) -> dict:
return {
"timestamp": self.timestamp,
"state_label": self.state_label,
"fft_window": self.fft_window,
"dominant_freq_hz": self.dominant_freq_hz,
"rms_velocity": self.rms_velocity,
}
@dataclass
class BearingFaultSimulator:
state: State = "normal"
degradation_level: float = 0.05
sample_rate_hz: float = SAMPLE_RATE_HZ
window_size: int = WINDOW_SIZE
fault_freq_hz: float = BEARING_FAULT_FREQ_HZ
_rng: np.random.Generator = field(
default_factory=lambda: np.random.default_rng(seed=42)
)
_tick: int = 0
def set_state(self, state: str) -> None:
if state not in STATE_PROFILES:
raise ValueError(
f"unknown state '{state}'; must be one of {list(STATE_PROFILES)}"
)
previous = self.state
self.state = state # type: ignore[assignment]
floor = STATE_PROFILES[state]["degradation_floor"]
self.degradation_level = max(self.degradation_level, floor)
if state == "normal":
self.degradation_level = floor
log.info(
"state_change",
component="sensor.simulator",
previous=previous,
new=state,
degradation_level=round(self.degradation_level, 3),
)
def _advance_degradation(self) -> None:
if self.state == "degrading":
self.degradation_level = min(
0.85, self.degradation_level + DEGRADATION_RAMP_PER_TICK
)
elif self.state == "imminent_failure":
self.degradation_level = min(0.98, self.degradation_level + 0.005)
# normal: leave at floor
def inject_fault_peak(
self, signal: np.ndarray, freq_hz: float, amplitude: float
) -> np.ndarray:
t = np.arange(signal.size, dtype=np.float64) / self.sample_rate_hz
phase = self._rng.uniform(0.0, 2 * math.pi)
# Add fundamental + 2x harmonic — bearing faults excite harmonics too.
peak = amplitude * np.sin(2 * math.pi * freq_hz * t + phase)
peak += 0.4 * amplitude * np.sin(2 * math.pi * 2 * freq_hz * t + phase)
return signal + peak
def generate_window(self) -> SensorWindow:
self._advance_degradation()
self._tick += 1
profile = STATE_PROFILES[self.state]
noise_scale = profile["noise_scale"]
# Broadband mechanical noise (healthy bearing baseline).
signal = self._rng.normal(0.0, 0.05 * noise_scale, size=self.window_size)
# Always include a small running-machine 30 Hz shaft component.
t = np.arange(self.window_size, dtype=np.float64) / self.sample_rate_hz
signal += 0.08 * np.sin(2 * math.pi * 30.0 * t)
# Inject the fault peak scaled by current degradation.
fault_amplitude = 0.6 * self.degradation_level
if fault_amplitude > 0.01:
signal = self.inject_fault_peak(
signal, self.fault_freq_hz, fault_amplitude
)
# Impulsive transients spike during imminent failure.
if self.state == "imminent_failure" and self._rng.random() < 0.5:
idx = int(self._rng.integers(0, self.window_size))
signal[idx] += self._rng.choice([-1.0, 1.0]) * 0.7
dominant_hz, rms = _spectral_stats(signal, self.sample_rate_hz)
window = SensorWindow(
timestamp=datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
state_label=self.state,
fft_window=[float(x) for x in signal],
dominant_freq_hz=float(dominant_hz),
rms_velocity=float(rms),
)
log.debug(
"window_emitted",
component="sensor.simulator",
tick=self._tick,
state=self.state,
degradation=round(self.degradation_level, 3),
dominant_hz=round(dominant_hz, 1),
rms=round(rms, 3),
)
return window
def _spectral_stats(signal: np.ndarray, sample_rate_hz: float) -> tuple[float, float]:
spectrum = np.abs(np.fft.rfft(signal))
freqs = np.fft.rfftfreq(signal.size, d=1.0 / sample_rate_hz)
# Ignore DC component when picking dominant frequency.
if spectrum.size > 1:
dominant_hz = float(freqs[1 + int(np.argmax(spectrum[1:]))])
else:
dominant_hz = 0.0
rms = float(np.sqrt(np.mean(signal**2)))
return dominant_hz, rms