Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, UploadFile, File, WebSocket, WebSocketDisconnect | |
| from fastapi.responses import HTMLResponse, JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pywhispercpp.model import Model | |
| import uvicorn | |
| import tempfile | |
| import os | |
| from time import time | |
| app = FastAPI(title="pyWhisperCPP Streaming API") | |
| # Allow CORS (useful if you host frontend separately, but fine on Spaces too) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Load Whisper.cpp model ONCE | |
| # OPTIONS: 'tiny.en', 'base.en', etc. | |
| model = Model("base.en") | |
| # ---------- Simple HTML frontend ---------- | |
| async def index(): | |
| # Serve the index.html file | |
| with open("index.html", "r", encoding="utf-8") as f: | |
| return HTMLResponse(f.read()) | |
| # ---------- Normal file upload transcription ---------- | |
| async def transcribe(file: UploadFile = File(...)): | |
| # Save uploaded audio temporarily | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp: | |
| temp.write(await file.read()) | |
| temp.flush() | |
| audio_path = temp.name | |
| try: | |
| start = time() | |
| segments = model.transcribe(audio_path) | |
| text = " ".join(seg.text for seg in segments) | |
| elapsed = round(time() - start, 3) | |
| return { | |
| "text": text, | |
| "processing_time_seconds": elapsed | |
| } | |
| finally: | |
| os.remove(audio_path) | |
| # ---------- WebSocket streaming transcription ---------- | |
| async def websocket_transcription(websocket: WebSocket): | |
| """ | |
| Receives binary audio chunks (WebM/Opus) from the browser via WebSocket, | |
| periodically transcribes the buffered audio with Whisper.cpp, | |
| and sends back partial text. | |
| """ | |
| await websocket.accept() | |
| buffer = b"" | |
| MIN_CHUNK_SIZE = 40_000 # bytes before running a transcription (tune this) | |
| try: | |
| while True: | |
| message = await websocket.receive() | |
| # Handle text messages (control) | |
| if "text" in message and message["text"] is not None: | |
| text_msg = message["text"] | |
| if text_msg == "__END__": | |
| # Finish stream | |
| break | |
| # Ignore other text controls for now | |
| continue | |
| # Handle binary audio data | |
| chunk = message.get("bytes") | |
| if not chunk: | |
| continue | |
| buffer += chunk | |
| # When enough audio collected, transcribe | |
| if len(buffer) >= MIN_CHUNK_SIZE: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".webm") as temp: | |
| temp.write(buffer) | |
| temp.flush() | |
| audio_path = temp.name | |
| try: | |
| segments = model.transcribe(audio_path) | |
| text = " ".join(seg.text for seg in segments).strip() | |
| finally: | |
| os.remove(audio_path) | |
| # Send partial transcript to client | |
| if text: | |
| await websocket.send_text(text) | |
| # Clear buffer (or keep tail if you want overlap) | |
| buffer = b"" | |
| # End-of-stream message | |
| await websocket.send_text("[stream ended]") | |
| except WebSocketDisconnect: | |
| # Client disconnected | |
| pass | |
| finally: | |
| await websocket.close() | |
| async def websocket_transcription_pcm(websocket: WebSocket): | |
| await websocket.accept() | |
| buffer = b"" | |
| SAMPLE_RATE = 16000 | |
| MIN_PCM_SIZE = SAMPLE_RATE * 2 * 3 # 3 seconds buffer | |
| try: | |
| while True: | |
| chunk = await websocket.receive_bytes() | |
| # If end control message (optional) | |
| if chunk == b"__END__": | |
| break | |
| buffer += chunk | |
| if len(buffer) >= MIN_PCM_SIZE: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp: | |
| temp.write(buffer) | |
| temp.flush() | |
| audio_path = temp.name | |
| segments = model.transcribe(audio_path) | |
| text = " ".join(seg.text for seg in segments).strip() | |
| if text: | |
| await websocket.send_text(text) | |
| buffer = b"" | |
| os.remove(audio_path) | |
| await websocket.send_text("[stream ended]") | |
| except: | |
| pass | |
| finally: | |
| await websocket.close() | |
| if __name__ == "__main__": | |
| # For local testing. On Spaces, you don't usually run uvicorn manually. | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |