factoryflow / src /agents /tools /sensor_tool.py
oabolade23's picture
FactoryFlow demo — initial submission
8ad9950
Raw
History Blame Contribute Delete
2.08 kB
"""CrewAI tool: read the latest sensor window and run anomaly detection.
The tool keeps a process-local simulator + last-result cache so successive
agent calls within one Crew kickoff see consistent state. The MCP server is
the public-facing stream for the Gradio UI; agents use this in-process path
for tighter latency and easier orchestration.
"""
from __future__ import annotations
import json
import numpy as np
import structlog
from crewai.tools import tool
from src.inference.anomaly_detector import AnomalyResult, detect
from src.sensor.simulator import BearingFaultSimulator
log = structlog.get_logger()
_simulator = BearingFaultSimulator()
_last: tuple[str, AnomalyResult] | None = None
def force_state(state: str) -> None:
"""Demo helper — switch the in-process simulator state from the UI."""
_simulator.set_state(state)
def latest() -> tuple[str, AnomalyResult] | None:
"""Return the most recent (state, result) pair without re-running inference."""
return _last
@tool("read_sensor_anomaly")
def read_sensor_anomaly() -> str:
"""Read the next vibration window from the in-process simulator,
run MOMENT anomaly detection on it, and return the result as JSON."""
global _last
try:
window = _simulator.generate_window()
fft = np.array(window.fft_window, dtype=np.float32)
result = detect(fft)
except Exception as exc:
log.error(
"sensor_tool_failed",
component="agents.tools.sensor_tool",
error=str(exc),
)
return json.dumps({"ok": False, "error": str(exc)})
_last = (_simulator.state, result)
payload = {
"ok": True,
"state_label": _simulator.state,
"dominant_freq_hz": window.dominant_freq_hz,
"rms_velocity": window.rms_velocity,
**result.as_dict(),
}
log.info(
"sensor_tool_ok",
component="agents.tools.sensor_tool",
state=_simulator.state,
score=payload["score"],
rul_hours=payload["rul_hours"],
)
return json.dumps(payload)