Spaces:
Sleeping
Sleeping
File size: 4,240 Bytes
8ad9950 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 | """MCP server exposing the bearing-fault simulator over SSE on port 8765.
Resources
- ``sensor://vibration/latest`` — most recent window as JSON
- ``sensor://vibration/stream`` — alias for ``latest`` (single-shot read; clients poll)
- ``sensor://vibration/history`` — last 60 windows as a JSON array
Tools
- ``set_state(state)`` — force the simulator into a named state
- ``get_stats()`` — current RMS, dominant frequency, sample count
"""
from __future__ import annotations
import asyncio
import json
from collections import deque
from typing import Deque
import structlog
from mcp.server.fastmcp import FastMCP
from src.sensor.simulator import BearingFaultSimulator, SensorWindow
log = structlog.get_logger()
EMIT_INTERVAL_SECONDS: float = 5.0
HISTORY_SIZE: int = 60
SERVER_PORT: int = 8765
_simulator: BearingFaultSimulator = BearingFaultSimulator()
_history: Deque[SensorWindow] = deque(maxlen=HISTORY_SIZE)
_emit_task: asyncio.Task | None = None
mcp: FastMCP = FastMCP("factoryflow-sensor")
mcp.settings.host = "0.0.0.0"
mcp.settings.port = SERVER_PORT
def _emit_once() -> SensorWindow:
window = _simulator.generate_window()
_history.append(window)
return window
async def _emit_loop() -> None:
log.info(
"emit_loop_start",
component="sensor.mcp_server",
interval_s=EMIT_INTERVAL_SECONDS,
)
# Seed immediately so the first read after startup is non-empty.
_emit_once()
while True:
try:
await asyncio.sleep(EMIT_INTERVAL_SECONDS)
_emit_once()
except asyncio.CancelledError:
log.info(
"emit_loop_cancelled",
component="sensor.mcp_server",
)
raise
except Exception as exc: # pragma: no cover — keep loop alive
log.error(
"emit_loop_error",
component="sensor.mcp_server",
error=str(exc),
)
async def _ensure_emit_loop() -> None:
global _emit_task
if _emit_task is None or _emit_task.done():
_emit_task = asyncio.create_task(_emit_loop())
@mcp.resource("sensor://vibration/latest")
async def latest_window() -> str:
"""Return the most recent sensor window as JSON."""
await _ensure_emit_loop()
if not _history:
_emit_once()
return json.dumps(_history[-1].to_dict())
@mcp.resource("sensor://vibration/stream")
async def stream_window() -> str:
"""Single-shot read of the latest window (clients poll for streaming)."""
return await latest_window()
@mcp.resource("sensor://vibration/history")
async def history_windows() -> str:
"""Return the last ``HISTORY_SIZE`` windows as a JSON array."""
await _ensure_emit_loop()
return json.dumps([w.to_dict() for w in _history])
@mcp.tool()
async def set_state(state: str) -> str:
"""Force the simulator into ``normal``, ``degrading``, or ``imminent_failure``."""
try:
_simulator.set_state(state)
except ValueError as exc:
log.warning(
"set_state_invalid",
component="sensor.mcp_server",
requested=state,
error=str(exc),
)
return json.dumps({"ok": False, "error": str(exc)})
return json.dumps(
{
"ok": True,
"state": _simulator.state,
"degradation_level": round(_simulator.degradation_level, 3),
}
)
@mcp.tool()
async def get_stats() -> str:
"""Return current RMS, dominant frequency, and total samples emitted."""
await _ensure_emit_loop()
if not _history:
_emit_once()
latest = _history[-1]
return json.dumps(
{
"state": _simulator.state,
"degradation_level": round(_simulator.degradation_level, 3),
"dominant_freq_hz": latest.dominant_freq_hz,
"rms_velocity": latest.rms_velocity,
"sample_count": len(_history),
"last_timestamp": latest.timestamp,
}
)
def main() -> None:
log.info(
"server_start",
component="sensor.mcp_server",
port=SERVER_PORT,
transport="sse",
)
mcp.run(transport="sse")
if __name__ == "__main__":
main()
|