import asyncio import json import logging from collections import defaultdict from dataclasses import asdict, dataclass, field from datetime import datetime from typing import Any, AsyncIterator, DefaultDict, Dict, List LOGGER = logging.getLogger(__name__) @dataclass class TeacherAction: speech: str board_write: str | None board_draw: str | None gesture: str gaze_target: str body_motion: str teaching_state: str @dataclass class TelemetryEvent: ts: str topic: str payload: Dict[str, Any] @dataclass class TeacherStateMachine: current_state: str = "explaining" def transition(self, next_state: str) -> str: valid = {"explaining", "questioning", "correcting", "assigning_homework"} if next_state in valid: self.current_state = next_state return self.current_state @dataclass class MCPMiddleware: """ROS-like synthetic pub/sub middleware for classroom events.""" queues: DefaultDict[str, List[asyncio.Queue]] = field(default_factory=lambda: defaultdict(list)) telemetry: List[TelemetryEvent] = field(default_factory=list) state_machine: TeacherStateMachine = field(default_factory=TeacherStateMachine) telemetry_limit: int = 5000 async def publish(self, topic: str, payload: Dict[str, Any]) -> None: event = TelemetryEvent( ts=datetime.utcnow().isoformat() + "Z", topic=topic, payload=payload, ) self.telemetry.append(event) if len(self.telemetry) > self.telemetry_limit: self.telemetry = self.telemetry[-self.telemetry_limit :] for q in self.queues[topic]: await q.put(event) async def subscribe(self, topic: str) -> AsyncIterator[TelemetryEvent]: queue: asyncio.Queue = asyncio.Queue(maxsize=128) self.queues[topic].append(queue) try: while True: event: TelemetryEvent = await queue.get() yield event finally: self.queues[topic].remove(queue) async def apply_teacher_action(self, action_raw: Dict[str, Any]) -> TeacherAction: self.state_machine.transition(action_raw.get("teaching_state", "explaining")) action = TeacherAction( speech=action_raw["speech"], board_write=action_raw.get("board_write"), board_draw=action_raw.get("board_draw"), gesture=action_raw.get("gesture", "idle"), gaze_target=action_raw.get("gaze_target", "student"), body_motion=action_raw.get("body_motion", "stand"), teaching_state=self.state_machine.current_state, ) payload = asdict(action) await self.publish("teacher.actions", payload) if action.board_write: await self.publish("teacher.board.write", {"text": action.board_write}) if action.board_draw: await self.publish("teacher.board.draw", {"instruction": action.board_draw}) LOGGER.info("MCP action published: %s", json.dumps(payload)) return action def get_telemetry_snapshot(self, limit: int = 200) -> List[Dict[str, Any]]: return [asdict(item) for item in self.telemetry[-limit:]]