Spaces:
Sleeping
Sleeping
| import asyncio | |
| import json | |
| import logging | |
| from collections import defaultdict | |
| from dataclasses import asdict, dataclass, field | |
| from datetime import datetime | |
| from typing import Any, AsyncIterator, DefaultDict, Dict, List | |
| LOGGER = logging.getLogger(__name__) | |
| class TeacherAction: | |
| speech: str | |
| board_write: str | None | |
| board_draw: str | None | |
| gesture: str | |
| gaze_target: str | |
| body_motion: str | |
| teaching_state: str | |
| class TelemetryEvent: | |
| ts: str | |
| topic: str | |
| payload: Dict[str, Any] | |
| class TeacherStateMachine: | |
| current_state: str = "explaining" | |
| def transition(self, next_state: str) -> str: | |
| valid = {"explaining", "questioning", "correcting", "assigning_homework"} | |
| if next_state in valid: | |
| self.current_state = next_state | |
| return self.current_state | |
| class MCPMiddleware: | |
| """ROS-like synthetic pub/sub middleware for classroom events.""" | |
| queues: DefaultDict[str, List[asyncio.Queue]] = field(default_factory=lambda: defaultdict(list)) | |
| telemetry: List[TelemetryEvent] = field(default_factory=list) | |
| state_machine: TeacherStateMachine = field(default_factory=TeacherStateMachine) | |
| telemetry_limit: int = 5000 | |
| async def publish(self, topic: str, payload: Dict[str, Any]) -> None: | |
| event = TelemetryEvent( | |
| ts=datetime.utcnow().isoformat() + "Z", | |
| topic=topic, | |
| payload=payload, | |
| ) | |
| self.telemetry.append(event) | |
| if len(self.telemetry) > self.telemetry_limit: | |
| self.telemetry = self.telemetry[-self.telemetry_limit :] | |
| for q in self.queues[topic]: | |
| await q.put(event) | |
| async def subscribe(self, topic: str) -> AsyncIterator[TelemetryEvent]: | |
| queue: asyncio.Queue = asyncio.Queue(maxsize=128) | |
| self.queues[topic].append(queue) | |
| try: | |
| while True: | |
| event: TelemetryEvent = await queue.get() | |
| yield event | |
| finally: | |
| self.queues[topic].remove(queue) | |
| async def apply_teacher_action(self, action_raw: Dict[str, Any]) -> TeacherAction: | |
| self.state_machine.transition(action_raw.get("teaching_state", "explaining")) | |
| action = TeacherAction( | |
| speech=action_raw["speech"], | |
| board_write=action_raw.get("board_write"), | |
| board_draw=action_raw.get("board_draw"), | |
| gesture=action_raw.get("gesture", "idle"), | |
| gaze_target=action_raw.get("gaze_target", "student"), | |
| body_motion=action_raw.get("body_motion", "stand"), | |
| teaching_state=self.state_machine.current_state, | |
| ) | |
| payload = asdict(action) | |
| await self.publish("teacher.actions", payload) | |
| if action.board_write: | |
| await self.publish("teacher.board.write", {"text": action.board_write}) | |
| if action.board_draw: | |
| await self.publish("teacher.board.draw", {"instruction": action.board_draw}) | |
| LOGGER.info("MCP action published: %s", json.dumps(payload)) | |
| return action | |
| def get_telemetry_snapshot(self, limit: int = 200) -> List[Dict[str, Any]]: | |
| return [asdict(item) for item in self.telemetry[-limit:]] | |