testcppp / app.py
Charan5775's picture
Update app.py
f3dd1db verified
raw
history blame
3.71 kB
from fastapi import FastAPI, UploadFile, File, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pywhispercpp.model import Model
import uvicorn
import tempfile
import os
from time import time
app = FastAPI(title="pyWhisperCPP Streaming API")
# Allow CORS (useful if you host frontend separately, but fine on Spaces too)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Load Whisper.cpp model ONCE
# OPTIONS: 'tiny.en', 'base.en', etc.
model = Model("base.en")
# ---------- Simple HTML frontend ----------
@app.get("/", response_class=HTMLResponse)
async def index():
# Serve the index.html file
with open("index.html", "r", encoding="utf-8") as f:
return HTMLResponse(f.read())
# ---------- Normal file upload transcription ----------
@app.post("/transcribe")
async def transcribe(file: UploadFile = File(...)):
# Save uploaded audio temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp:
temp.write(await file.read())
temp.flush()
audio_path = temp.name
try:
start = time()
segments = model.transcribe(audio_path)
text = " ".join(seg.text for seg in segments)
elapsed = round(time() - start, 3)
return {
"text": text,
"processing_time_seconds": elapsed
}
finally:
os.remove(audio_path)
# ---------- WebSocket streaming transcription ----------
@app.websocket("/ws/transcribe_stream")
async def websocket_transcription(websocket: WebSocket):
"""
Receives binary audio chunks (WebM/Opus) from the browser via WebSocket,
periodically transcribes the buffered audio with Whisper.cpp,
and sends back partial text.
"""
await websocket.accept()
buffer = b""
MIN_CHUNK_SIZE = 40_000 # bytes before running a transcription (tune this)
try:
while True:
message = await websocket.receive()
# Handle text messages (control)
if "text" in message and message["text"] is not None:
text_msg = message["text"]
if text_msg == "__END__":
# Finish stream
break
# Ignore other text controls for now
continue
# Handle binary audio data
chunk = message.get("bytes")
if not chunk:
continue
buffer += chunk
# When enough audio collected, transcribe
if len(buffer) >= MIN_CHUNK_SIZE:
with tempfile.NamedTemporaryFile(delete=False, suffix=".webm") as temp:
temp.write(buffer)
temp.flush()
audio_path = temp.name
try:
segments = model.transcribe(audio_path)
text = " ".join(seg.text for seg in segments).strip()
finally:
os.remove(audio_path)
# Send partial transcript to client
if text:
await websocket.send_text(text)
# Clear buffer (or keep tail if you want overlap)
buffer = b""
# End-of-stream message
await websocket.send_text("[stream ended]")
except WebSocketDisconnect:
# Client disconnected
pass
finally:
await websocket.close()
if __name__ == "__main__":
# For local testing. On Spaces, you don't usually run uvicorn manually.
uvicorn.run(app, host="0.0.0.0", port=7860)