File size: 5,659 Bytes
8ad9950
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
"""Synthetic bearing-fault vibration simulator for FactoryFlow.

Generates 512-sample time-domain windows at 10 kHz that mimic the vibration
signature of a 6205 ball bearing. State transitions between healthy and
imminent_failure inject a growing sinusoid at the BPFO frequency (~85 Hz at
1800 RPM), which MOMENT later flags as a reconstruction anomaly.
"""

from __future__ import annotations

import math
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Literal

import numpy as np
import structlog

log = structlog.get_logger()

WINDOW_SIZE: int = 512
SAMPLE_RATE_HZ: float = 10_000.0
BEARING_FAULT_FREQ_HZ: float = 85.0
DEGRADATION_RAMP_PER_TICK: float = 0.01

State = Literal["normal", "degrading", "imminent_failure"]

STATE_PROFILES: dict[str, dict[str, float]] = {
    "normal": {"degradation_floor": 0.05, "noise_scale": 1.0},
    "degrading": {"degradation_floor": 0.0, "noise_scale": 1.2},
    "imminent_failure": {"degradation_floor": 0.92, "noise_scale": 1.5},
}


@dataclass
class SensorWindow:
    timestamp: str
    state_label: str
    fft_window: list[float]
    dominant_freq_hz: float
    rms_velocity: float

    def to_dict(self) -> dict:
        return {
            "timestamp": self.timestamp,
            "state_label": self.state_label,
            "fft_window": self.fft_window,
            "dominant_freq_hz": self.dominant_freq_hz,
            "rms_velocity": self.rms_velocity,
        }


@dataclass
class BearingFaultSimulator:
    state: State = "normal"
    degradation_level: float = 0.05
    sample_rate_hz: float = SAMPLE_RATE_HZ
    window_size: int = WINDOW_SIZE
    fault_freq_hz: float = BEARING_FAULT_FREQ_HZ
    _rng: np.random.Generator = field(
        default_factory=lambda: np.random.default_rng(seed=42)
    )
    _tick: int = 0

    def set_state(self, state: str) -> None:
        if state not in STATE_PROFILES:
            raise ValueError(
                f"unknown state '{state}'; must be one of {list(STATE_PROFILES)}"
            )
        previous = self.state
        self.state = state  # type: ignore[assignment]
        floor = STATE_PROFILES[state]["degradation_floor"]
        self.degradation_level = max(self.degradation_level, floor)
        if state == "normal":
            self.degradation_level = floor
        log.info(
            "state_change",
            component="sensor.simulator",
            previous=previous,
            new=state,
            degradation_level=round(self.degradation_level, 3),
        )

    def _advance_degradation(self) -> None:
        if self.state == "degrading":
            self.degradation_level = min(
                0.85, self.degradation_level + DEGRADATION_RAMP_PER_TICK
            )
        elif self.state == "imminent_failure":
            self.degradation_level = min(0.98, self.degradation_level + 0.005)
        # normal: leave at floor

    def inject_fault_peak(
        self, signal: np.ndarray, freq_hz: float, amplitude: float
    ) -> np.ndarray:
        t = np.arange(signal.size, dtype=np.float64) / self.sample_rate_hz
        phase = self._rng.uniform(0.0, 2 * math.pi)
        # Add fundamental + 2x harmonic — bearing faults excite harmonics too.
        peak = amplitude * np.sin(2 * math.pi * freq_hz * t + phase)
        peak += 0.4 * amplitude * np.sin(2 * math.pi * 2 * freq_hz * t + phase)
        return signal + peak

    def generate_window(self) -> SensorWindow:
        self._advance_degradation()
        self._tick += 1

        profile = STATE_PROFILES[self.state]
        noise_scale = profile["noise_scale"]

        # Broadband mechanical noise (healthy bearing baseline).
        signal = self._rng.normal(0.0, 0.05 * noise_scale, size=self.window_size)

        # Always include a small running-machine 30 Hz shaft component.
        t = np.arange(self.window_size, dtype=np.float64) / self.sample_rate_hz
        signal += 0.08 * np.sin(2 * math.pi * 30.0 * t)

        # Inject the fault peak scaled by current degradation.
        fault_amplitude = 0.6 * self.degradation_level
        if fault_amplitude > 0.01:
            signal = self.inject_fault_peak(
                signal, self.fault_freq_hz, fault_amplitude
            )

        # Impulsive transients spike during imminent failure.
        if self.state == "imminent_failure" and self._rng.random() < 0.5:
            idx = int(self._rng.integers(0, self.window_size))
            signal[idx] += self._rng.choice([-1.0, 1.0]) * 0.7

        dominant_hz, rms = _spectral_stats(signal, self.sample_rate_hz)

        window = SensorWindow(
            timestamp=datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
            state_label=self.state,
            fft_window=[float(x) for x in signal],
            dominant_freq_hz=float(dominant_hz),
            rms_velocity=float(rms),
        )
        log.debug(
            "window_emitted",
            component="sensor.simulator",
            tick=self._tick,
            state=self.state,
            degradation=round(self.degradation_level, 3),
            dominant_hz=round(dominant_hz, 1),
            rms=round(rms, 3),
        )
        return window


def _spectral_stats(signal: np.ndarray, sample_rate_hz: float) -> tuple[float, float]:
    spectrum = np.abs(np.fft.rfft(signal))
    freqs = np.fft.rfftfreq(signal.size, d=1.0 / sample_rate_hz)
    # Ignore DC component when picking dominant frequency.
    if spectrum.size > 1:
        dominant_hz = float(freqs[1 + int(np.argmax(spectrum[1:]))])
    else:
        dominant_hz = 0.0
    rms = float(np.sqrt(np.mean(signal**2)))
    return dominant_hz, rms