Spaces:
Sleeping
Sleeping
| import os | |
| import struct | |
| import subprocess | |
| import tempfile | |
| import wave | |
| import numpy as np | |
| class SpeechRecognizer: | |
| def __init__( | |
| self, | |
| model_name: str = "base", | |
| sample_rate: int = 16000, | |
| silence_timeout: float = 3.0, | |
| max_duration: float = 120.0, | |
| ): | |
| self.model_name = model_name | |
| self.sample_rate = sample_rate | |
| self.silence_timeout = silence_timeout | |
| self.max_duration = max_duration | |
| self.chunk_size = 1024 | |
| self._model = None | |
| def _load_model(self): | |
| if self._model is None: | |
| import whisper | |
| print(f"Whisperモデル ({self.model_name}) を読み込み中...") | |
| self._model = whisper.load_model(self.model_name) | |
| return self._model | |
| def listen(self) -> tuple[str, float]: | |
| import pyaudio | |
| audio = pyaudio.PyAudio() | |
| stream = audio.open( | |
| format=pyaudio.paInt16, | |
| channels=1, | |
| rate=self.sample_rate, | |
| input=True, | |
| frames_per_buffer=self.chunk_size, | |
| ) | |
| print("🎤 回答をどうぞ(話し終わったら少し待ってください)...") | |
| frames: list[bytes] = [] | |
| silent_chunks = 0 | |
| silence_limit = int(self.silence_timeout * self.sample_rate / self.chunk_size) | |
| max_chunks = int(self.max_duration * self.sample_rate / self.chunk_size) | |
| has_speech = False | |
| silence_threshold = 500 | |
| try: | |
| for _ in range(max_chunks): | |
| data = stream.read(self.chunk_size, exception_on_overflow=False) | |
| frames.append(data) | |
| rms = self._calculate_rms(data) | |
| if rms > silence_threshold: | |
| has_speech = True | |
| silent_chunks = 0 | |
| else: | |
| silent_chunks += 1 | |
| if has_speech and silent_chunks > silence_limit: | |
| break | |
| finally: | |
| stream.stop_stream() | |
| stream.close() | |
| audio.terminate() | |
| if not frames or not has_speech: | |
| return "", 0.0 | |
| duration = len(frames) * self.chunk_size / self.sample_rate | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: | |
| tmp_path = tmp.name | |
| with wave.open(tmp.name, "wb") as wf: | |
| wf.setnchannels(1) | |
| wf.setsampwidth(2) | |
| wf.setframerate(self.sample_rate) | |
| wf.writeframes(b"".join(frames)) | |
| try: | |
| model = self._load_model() | |
| result = model.transcribe(tmp_path, language="ja") | |
| text = result["text"].strip() | |
| finally: | |
| os.unlink(tmp_path) | |
| return text, duration | |
| def _calculate_rms(data: bytes) -> float: | |
| count = len(data) // 2 | |
| shorts = struct.unpack(f"{count}h", data) | |
| arr = np.array(shorts, dtype=np.float64) | |
| if len(arr) == 0: | |
| return 0.0 | |
| return float(np.sqrt(np.mean(arr ** 2))) | |
| class SpeechSynthesizer: | |
| def speak(self, text: str) -> None: | |
| from gtts import gTTS | |
| with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp: | |
| tmp_path = tmp.name | |
| try: | |
| tts = gTTS(text=text, lang="ja") | |
| tts.save(tmp_path) | |
| subprocess.run( | |
| ["afplay", tmp_path], | |
| check=True, | |
| capture_output=True, | |
| ) | |
| finally: | |
| if os.path.exists(tmp_path): | |
| os.unlink(tmp_path) | |
| class DryRunRecognizer: | |
| """テスト用: マイクの代わりにキーボード入力を使用""" | |
| def listen(self) -> tuple[str, float]: | |
| print("📝 回答を入力してください(空行で終了):") | |
| lines: list[str] = [] | |
| while True: | |
| try: | |
| line = input() | |
| if line == "": | |
| break | |
| lines.append(line) | |
| except EOFError: | |
| break | |
| text = "\n".join(lines) | |
| return text, 0.0 | |
| class DryRunSynthesizer: | |
| """テスト用: 音声の代わりにテキスト表示""" | |
| def speak(self, text: str) -> None: | |
| print(f"🔊 {text}") | |