Ladybug / middleware.py
Adedoyinjames's picture
Upload 6 files
5374858 verified
import asyncio
import json
import logging
from collections import defaultdict
from dataclasses import asdict, dataclass, field
from datetime import datetime
from typing import Any, AsyncIterator, DefaultDict, Dict, List
LOGGER = logging.getLogger(__name__)
@dataclass
class TeacherAction:
speech: str
board_write: str | None
board_draw: str | None
gesture: str
gaze_target: str
body_motion: str
teaching_state: str
@dataclass
class TelemetryEvent:
ts: str
topic: str
payload: Dict[str, Any]
@dataclass
class TeacherStateMachine:
current_state: str = "explaining"
def transition(self, next_state: str) -> str:
valid = {"explaining", "questioning", "correcting", "assigning_homework"}
if next_state in valid:
self.current_state = next_state
return self.current_state
@dataclass
class MCPMiddleware:
"""ROS-like synthetic pub/sub middleware for classroom events."""
queues: DefaultDict[str, List[asyncio.Queue]] = field(default_factory=lambda: defaultdict(list))
telemetry: List[TelemetryEvent] = field(default_factory=list)
state_machine: TeacherStateMachine = field(default_factory=TeacherStateMachine)
telemetry_limit: int = 5000
async def publish(self, topic: str, payload: Dict[str, Any]) -> None:
event = TelemetryEvent(
ts=datetime.utcnow().isoformat() + "Z",
topic=topic,
payload=payload,
)
self.telemetry.append(event)
if len(self.telemetry) > self.telemetry_limit:
self.telemetry = self.telemetry[-self.telemetry_limit :]
for q in self.queues[topic]:
await q.put(event)
async def subscribe(self, topic: str) -> AsyncIterator[TelemetryEvent]:
queue: asyncio.Queue = asyncio.Queue(maxsize=128)
self.queues[topic].append(queue)
try:
while True:
event: TelemetryEvent = await queue.get()
yield event
finally:
self.queues[topic].remove(queue)
async def apply_teacher_action(self, action_raw: Dict[str, Any]) -> TeacherAction:
self.state_machine.transition(action_raw.get("teaching_state", "explaining"))
action = TeacherAction(
speech=action_raw["speech"],
board_write=action_raw.get("board_write"),
board_draw=action_raw.get("board_draw"),
gesture=action_raw.get("gesture", "idle"),
gaze_target=action_raw.get("gaze_target", "student"),
body_motion=action_raw.get("body_motion", "stand"),
teaching_state=self.state_machine.current_state,
)
payload = asdict(action)
await self.publish("teacher.actions", payload)
if action.board_write:
await self.publish("teacher.board.write", {"text": action.board_write})
if action.board_draw:
await self.publish("teacher.board.draw", {"instruction": action.board_draw})
LOGGER.info("MCP action published: %s", json.dumps(payload))
return action
def get_telemetry_snapshot(self, limit: int = 200) -> List[Dict[str, Any]]:
return [asdict(item) for item in self.telemetry[-limit:]]