Saidie000's picture
Upload 83 files
09f1b19 verified
"""Dynamic chunked speech module."""
from __future__ import annotations
import asyncio
from typing import Deque
from collections import deque
from jenaai.core.module import BaseModule
class Module(BaseModule):
"""Streaming chunked speech with interruptible queue."""
def __init__(self, event_bus, config):
super().__init__(event_bus, config)
self._queue: Deque[str] = deque()
self._speaking = False
async def start(self) -> None:
await self.event_bus.publish(
"system.log",
{"message": "Speech module ready", "module": self.metadata.name},
)
async def stop(self) -> None:
self._queue.clear()
self._speaking = False
async def speak(self, text: str, emotion: str = "neutral") -> None:
self._queue.append(f"[{emotion}] {text}")
if not self._speaking:
await self._drain()
async def _drain(self) -> None:
self._speaking = True
while self._queue:
chunk = self._queue.popleft()
await self.event_bus.publish(
"speech.chunk",
{"text": chunk, "module": self.metadata.name},
)
await asyncio.sleep(0.1)
self._speaking = False