File size: 4,240 Bytes
8ad9950
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""MCP server exposing the bearing-fault simulator over SSE on port 8765.

Resources
- ``sensor://vibration/latest``  — most recent window as JSON
- ``sensor://vibration/stream``  — alias for ``latest`` (single-shot read; clients poll)
- ``sensor://vibration/history`` — last 60 windows as a JSON array

Tools
- ``set_state(state)`` — force the simulator into a named state
- ``get_stats()``      — current RMS, dominant frequency, sample count
"""

from __future__ import annotations

import asyncio
import json
from collections import deque
from typing import Deque

import structlog
from mcp.server.fastmcp import FastMCP

from src.sensor.simulator import BearingFaultSimulator, SensorWindow

log = structlog.get_logger()

EMIT_INTERVAL_SECONDS: float = 5.0
HISTORY_SIZE: int = 60
SERVER_PORT: int = 8765

_simulator: BearingFaultSimulator = BearingFaultSimulator()
_history: Deque[SensorWindow] = deque(maxlen=HISTORY_SIZE)
_emit_task: asyncio.Task | None = None

mcp: FastMCP = FastMCP("factoryflow-sensor")
mcp.settings.host = "0.0.0.0"
mcp.settings.port = SERVER_PORT


def _emit_once() -> SensorWindow:
    window = _simulator.generate_window()
    _history.append(window)
    return window


async def _emit_loop() -> None:
    log.info(
        "emit_loop_start",
        component="sensor.mcp_server",
        interval_s=EMIT_INTERVAL_SECONDS,
    )
    # Seed immediately so the first read after startup is non-empty.
    _emit_once()
    while True:
        try:
            await asyncio.sleep(EMIT_INTERVAL_SECONDS)
            _emit_once()
        except asyncio.CancelledError:
            log.info(
                "emit_loop_cancelled",
                component="sensor.mcp_server",
            )
            raise
        except Exception as exc:  # pragma: no cover — keep loop alive
            log.error(
                "emit_loop_error",
                component="sensor.mcp_server",
                error=str(exc),
            )


async def _ensure_emit_loop() -> None:
    global _emit_task
    if _emit_task is None or _emit_task.done():
        _emit_task = asyncio.create_task(_emit_loop())


@mcp.resource("sensor://vibration/latest")
async def latest_window() -> str:
    """Return the most recent sensor window as JSON."""
    await _ensure_emit_loop()
    if not _history:
        _emit_once()
    return json.dumps(_history[-1].to_dict())


@mcp.resource("sensor://vibration/stream")
async def stream_window() -> str:
    """Single-shot read of the latest window (clients poll for streaming)."""
    return await latest_window()


@mcp.resource("sensor://vibration/history")
async def history_windows() -> str:
    """Return the last ``HISTORY_SIZE`` windows as a JSON array."""
    await _ensure_emit_loop()
    return json.dumps([w.to_dict() for w in _history])


@mcp.tool()
async def set_state(state: str) -> str:
    """Force the simulator into ``normal``, ``degrading``, or ``imminent_failure``."""
    try:
        _simulator.set_state(state)
    except ValueError as exc:
        log.warning(
            "set_state_invalid",
            component="sensor.mcp_server",
            requested=state,
            error=str(exc),
        )
        return json.dumps({"ok": False, "error": str(exc)})
    return json.dumps(
        {
            "ok": True,
            "state": _simulator.state,
            "degradation_level": round(_simulator.degradation_level, 3),
        }
    )


@mcp.tool()
async def get_stats() -> str:
    """Return current RMS, dominant frequency, and total samples emitted."""
    await _ensure_emit_loop()
    if not _history:
        _emit_once()
    latest = _history[-1]
    return json.dumps(
        {
            "state": _simulator.state,
            "degradation_level": round(_simulator.degradation_level, 3),
            "dominant_freq_hz": latest.dominant_freq_hz,
            "rms_velocity": latest.rms_velocity,
            "sample_count": len(_history),
            "last_timestamp": latest.timestamp,
        }
    )


def main() -> None:
    log.info(
        "server_start",
        component="sensor.mcp_server",
        port=SERVER_PORT,
        transport="sse",
    )
    mcp.run(transport="sse")


if __name__ == "__main__":
    main()