testcppp / app.py
Charan5775's picture
Update app.py
b5db09d verified
raw
history blame
4.8 kB
from fastapi import FastAPI, UploadFile, File, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pywhispercpp.model import Model
import uvicorn
import tempfile
import os
from time import time
app = FastAPI(title="pyWhisperCPP Streaming API")
# Allow CORS (useful if you host frontend separately, but fine on Spaces too)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Load Whisper.cpp model ONCE
# OPTIONS: 'tiny.en', 'base.en', etc.
model = Model("base.en")
# ---------- Simple HTML frontend ----------
@app.get("/", response_class=HTMLResponse)
async def index():
# Serve the index.html file
with open("index.html", "r", encoding="utf-8") as f:
return HTMLResponse(f.read())
# ---------- Normal file upload transcription ----------
@app.post("/transcribe")
async def transcribe(file: UploadFile = File(...)):
# Save uploaded audio temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp:
temp.write(await file.read())
temp.flush()
audio_path = temp.name
try:
start = time()
segments = model.transcribe(audio_path)
text = " ".join(seg.text for seg in segments)
elapsed = round(time() - start, 3)
return {
"text": text,
"processing_time_seconds": elapsed
}
finally:
os.remove(audio_path)
# ---------- WebSocket streaming transcription ----------
@app.websocket("/ws/transcribe_stream")
async def websocket_transcription(websocket: WebSocket):
"""
Receives binary audio chunks (WebM/Opus) from the browser via WebSocket,
periodically transcribes the buffered audio with Whisper.cpp,
and sends back partial text.
"""
await websocket.accept()
buffer = b""
MIN_CHUNK_SIZE = 40_000 # bytes before running a transcription (tune this)
try:
while True:
message = await websocket.receive()
# Handle text messages (control)
if "text" in message and message["text"] is not None:
text_msg = message["text"]
if text_msg == "__END__":
# Finish stream
break
# Ignore other text controls for now
continue
# Handle binary audio data
chunk = message.get("bytes")
if not chunk:
continue
buffer += chunk
# When enough audio collected, transcribe
if len(buffer) >= MIN_CHUNK_SIZE:
with tempfile.NamedTemporaryFile(delete=False, suffix=".webm") as temp:
temp.write(buffer)
temp.flush()
audio_path = temp.name
try:
segments = model.transcribe(audio_path)
text = " ".join(seg.text for seg in segments).strip()
finally:
os.remove(audio_path)
# Send partial transcript to client
if text:
await websocket.send_text(text)
# Clear buffer (or keep tail if you want overlap)
buffer = b""
# End-of-stream message
await websocket.send_text("[stream ended]")
except WebSocketDisconnect:
# Client disconnected
pass
finally:
await websocket.close()
@app.websocket("/ws/transcribe_pcm")
async def websocket_transcription_pcm(websocket: WebSocket):
await websocket.accept()
buffer = b""
SAMPLE_RATE = 16000
MIN_PCM_SIZE = SAMPLE_RATE * 2 * 3 # 3 seconds buffer
try:
while True:
chunk = await websocket.receive_bytes()
# If end control message (optional)
if chunk == b"__END__":
break
buffer += chunk
if len(buffer) >= MIN_PCM_SIZE:
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp:
temp.write(buffer)
temp.flush()
audio_path = temp.name
segments = model.transcribe(audio_path)
text = " ".join(seg.text for seg in segments).strip()
if text:
await websocket.send_text(text)
buffer = b""
os.remove(audio_path)
await websocket.send_text("[stream ended]")
except:
pass
finally:
await websocket.close()
if __name__ == "__main__":
# For local testing. On Spaces, you don't usually run uvicorn manually.
uvicorn.run(app, host="0.0.0.0", port=7860)