Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, UploadFile, File, WebSocket | |
| from pywhispercpp.model import Model | |
| import uvicorn, tempfile, os | |
| from time import time | |
| import wave | |
| app = FastAPI(title="pyWhisperCPP API") | |
| model = Model("base.en") | |
| def root(): | |
| return {"status": "Whisper.cpp API is running!"} | |
| async def transcribe(file: UploadFile = File(...)): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp: | |
| temp.write(await file.read()) | |
| temp.flush() | |
| audio_path = temp.name | |
| start = time() | |
| segments = model.transcribe(audio_path) | |
| text = " ".join(seg.text for seg in segments) | |
| os.remove(audio_path) | |
| return {"text": text} | |
| # ================================ | |
| # 🔥 Real-time streaming endpoint | |
| # ================================ | |
| def save_pcm16_as_wav(data: bytes, sample_rate=16000, channels=1): | |
| """Convert PCM16 stream into a valid WAV file""" | |
| temp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") | |
| with wave.open(temp.name, "wb") as wf: | |
| wf.setnchannels(channels) | |
| wf.setsampwidth(2) # PCM16 = 2 bytes per sample | |
| wf.setframerate(sample_rate) | |
| wf.writeframes(data) | |
| return temp.name | |
| async def websocket_live(websocket: WebSocket): | |
| await websocket.accept() | |
| print("Client connected!") | |
| buffer = b"" | |
| SAMPLE_RATE = 16000 | |
| MIN_PCM_SIZE = SAMPLE_RATE * 2 * 2 # 2 seconds PCM16 | |
| try: | |
| while True: | |
| data = await websocket.receive() | |
| # Text (control) | |
| if data.get("text"): | |
| if data["text"] == "__END__": | |
| break | |
| continue | |
| # Binary PCM audio | |
| chunk = data.get("bytes", b"") | |
| if not chunk: | |
| continue | |
| buffer += chunk | |
| if len(buffer) >= MIN_PCM_SIZE: | |
| print(f"Transcribing {len(buffer)} bytes") | |
| # Convert PCM16 → WAV with header | |
| audio_path = save_pcm16_as_wav(buffer, SAMPLE_RATE) | |
| segments = model.transcribe(audio_path) | |
| text = " ".join(seg.text for seg in segments).strip() | |
| os.remove(audio_path) | |
| buffer = b"" | |
| if text: | |
| await websocket.send_text(text) | |
| await websocket.send_text("[END]") | |
| except Exception as e: | |
| print("WebSocket Error:", e) | |
| finally: | |
| await websocket.close() | |
| print("Client disconnected") | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |