import os import struct import subprocess import tempfile import wave import numpy as np class SpeechRecognizer: def __init__( self, model_name: str = "base", sample_rate: int = 16000, silence_timeout: float = 3.0, max_duration: float = 120.0, ): self.model_name = model_name self.sample_rate = sample_rate self.silence_timeout = silence_timeout self.max_duration = max_duration self.chunk_size = 1024 self._model = None def _load_model(self): if self._model is None: import whisper print(f"Whisperモデル ({self.model_name}) を読み込み中...") self._model = whisper.load_model(self.model_name) return self._model def listen(self) -> tuple[str, float]: import pyaudio audio = pyaudio.PyAudio() stream = audio.open( format=pyaudio.paInt16, channels=1, rate=self.sample_rate, input=True, frames_per_buffer=self.chunk_size, ) print("🎤 回答をどうぞ(話し終わったら少し待ってください)...") frames: list[bytes] = [] silent_chunks = 0 silence_limit = int(self.silence_timeout * self.sample_rate / self.chunk_size) max_chunks = int(self.max_duration * self.sample_rate / self.chunk_size) has_speech = False silence_threshold = 500 try: for _ in range(max_chunks): data = stream.read(self.chunk_size, exception_on_overflow=False) frames.append(data) rms = self._calculate_rms(data) if rms > silence_threshold: has_speech = True silent_chunks = 0 else: silent_chunks += 1 if has_speech and silent_chunks > silence_limit: break finally: stream.stop_stream() stream.close() audio.terminate() if not frames or not has_speech: return "", 0.0 duration = len(frames) * self.chunk_size / self.sample_rate with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: tmp_path = tmp.name with wave.open(tmp.name, "wb") as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(self.sample_rate) wf.writeframes(b"".join(frames)) try: model = self._load_model() result = model.transcribe(tmp_path, language="ja") text = result["text"].strip() finally: os.unlink(tmp_path) return text, duration @staticmethod def _calculate_rms(data: bytes) -> float: count = len(data) // 2 shorts = struct.unpack(f"{count}h", data) arr = np.array(shorts, dtype=np.float64) if len(arr) == 0: return 0.0 return float(np.sqrt(np.mean(arr ** 2))) class SpeechSynthesizer: def speak(self, text: str) -> None: from gtts import gTTS with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp: tmp_path = tmp.name try: tts = gTTS(text=text, lang="ja") tts.save(tmp_path) subprocess.run( ["afplay", tmp_path], check=True, capture_output=True, ) finally: if os.path.exists(tmp_path): os.unlink(tmp_path) class DryRunRecognizer: """テスト用: マイクの代わりにキーボード入力を使用""" def listen(self) -> tuple[str, float]: print("📝 回答を入力してください(空行で終了):") lines: list[str] = [] while True: try: line = input() if line == "": break lines.append(line) except EOFError: break text = "\n".join(lines) return text, 0.0 class DryRunSynthesizer: """テスト用: 音声の代わりにテキスト表示""" def speak(self, text: str) -> None: print(f"🔊 {text}")