Alaaharoun's picture
Upload app.py
69b70a7 verified
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Depends, WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from faster_whisper import WhisperModel
import shutil
import os
import tempfile
import sys
import json
import asyncio
from typing import Optional
# Create FastAPI app
app = FastAPI(
title="Faster Whisper Service",
description="High-performance speech-to-text service using Faster Whisper",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Security
security = HTTPBearer(auto_error=False)
# Configuration
API_TOKEN = ""
REQUIRE_AUTH = False
# Global model variable
model = None
# WebSocket connection manager
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except:
# Remove disconnected clients
self.active_connections.remove(connection)
manager = ConnectionManager()
def load_model():
"""Load the Whisper model"""
global model
try:
print("๐Ÿ”„ Loading Whisper model...")
# ุงุณุชุฎุฏุงู… ู†ู…ูˆุฐุฌ ุฃูƒุจุฑ ู„ุฏุนู… ุฃูุถู„ ู„ู„ุบุงุช ู…ุชุนุฏุฏุฉ ุจู…ุง ููŠ ุฐู„ูƒ ุงู„ุฑูˆุณูŠุฉ
model = WhisperModel("large-v3", compute_type="int8")
print("โœ… Model loaded successfully")
return True
except Exception as e:
print(f"โŒ Error loading large model: {e}")
print("๐Ÿ”„ Trying with base model as fallback...")
try:
model = WhisperModel("base", compute_type="int8")
print("โœ… Base model loaded successfully")
return True
except Exception as e2:
print(f"โŒ Error loading base model: {e2}")
print(f"Python version: {sys.version}")
print(f"Current working directory: {os.getcwd()}")
model = None
return False
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify API token if authentication is required"""
if REQUIRE_AUTH:
if not credentials:
raise HTTPException(
status_code=401,
detail="API token required",
headers={"WWW-Authenticate": "Bearer"},
)
if credentials.credentials != API_TOKEN:
raise HTTPException(
status_code=403,
detail="Invalid API token",
headers={"WWW-Authenticate": "Bearer"},
)
return credentials
@app.on_event("startup")
async def startup_event():
"""Load model on startup"""
load_model()
@app.get("/")
async def root():
"""Root endpoint"""
return {"message": "Faster Whisper Service is running"}
@app.get("/health")
async def health_check(credentials: HTTPAuthorizationCredentials = Depends(verify_token)):
"""Health check endpoint"""
return {
"status": "healthy",
"model_loaded": model is not None,
"service": "faster-whisper",
"auth_required": REQUIRE_AUTH,
"auth_configured": bool(API_TOKEN),
"vad_support": True,
"websocket_support": True,
"python_version": sys.version
}
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time transcription"""
await manager.connect(websocket)
try:
print("๐Ÿ”Œ WebSocket connection established")
await manager.send_personal_message(
json.dumps({
"type": "connection",
"status": "connected",
"message": "WebSocket connection established"
}),
websocket
)
while True:
try:
# Receive data from client
message = await websocket.receive()
# Handle different message types
if "bytes" in message:
# Binary audio data
data = message["bytes"]
print(f"๐ŸŽต WebSocket: Processing audio chunk ({len(data)} bytes)")
# Save audio data to temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as temp_file:
temp_file.write(data)
temp_path = temp_file.name
# Transcribe audio
if model:
segments, info = model.transcribe(temp_path)
# Convert generator to list first
segments_list = list(segments)
transcription = " ".join([seg.text for seg in segments_list])
# Send transcription result
result = {
"type": "transcription",
"text": transcription,
"language": info.language,
"language_probability": info.language_probability,
"success": True
}
await manager.send_personal_message(json.dumps(result), websocket)
print(f"โœ… WebSocket: Sent transcription: '{transcription}'")
else:
error_result = {
"type": "error",
"message": "Model not loaded",
"success": False
}
await manager.send_personal_message(json.dumps(error_result), websocket)
# Clean up temporary file
os.unlink(temp_path)
elif "text" in message:
# Text message (JSON configuration)
try:
data = json.loads(message["text"])
print(f"๐Ÿ“จ WebSocket: Received configuration: {data}")
if data.get("type") == "init":
# Handle initialization
await manager.send_personal_message(
json.dumps({
"type": "connection",
"status": "initialized",
"message": "Configuration received"
}),
websocket
)
except json.JSONDecodeError:
print(f"โš ๏ธ WebSocket: Invalid JSON received: {message['text']}")
except Exception as e:
print(f"โŒ WebSocket processing error: {e}")
error_result = {
"type": "error",
"message": str(e),
"success": False
}
await manager.send_personal_message(json.dumps(error_result), websocket)
except WebSocketDisconnect:
print("๐Ÿ”Œ WebSocket connection disconnected")
manager.disconnect(websocket)
except Exception as e:
print(f"โŒ WebSocket error: {e}")
manager.disconnect(websocket)
@app.post("/transcribe")
async def transcribe(
file: UploadFile = File(...),
language: Optional[str] = Form(None),
task: Optional[str] = Form("transcribe"),
vad_filter: Optional[bool] = Form(False),
vad_parameters: Optional[str] = Form("threshold=0.5"),
credentials: HTTPAuthorizationCredentials = Depends(verify_token)
):
"""
Transcribe audio file to text with optional VAD support
"""
temp_path = None
try:
print(f"๐ŸŽต Starting transcription for file: {file.filename}")
# Check if model is loaded
if model is None:
print("โŒ Model not loaded")
return JSONResponse(
status_code=500,
content={"error": "Model not loaded", "success": False}
)
# Validate file
if not file.filename:
print("โŒ No file provided")
return JSONResponse(
status_code=400,
content={"error": "No file provided", "success": False}
)
# Validate file size (100MB limit for Hugging Face Spaces)
file.file.seek(0, 2)
file_size = file.file.tell()
file.file.seek(0)
print(f"๐Ÿ“ File size: {file_size} bytes ({file_size / 1024 / 1024:.2f} MB)")
# ุฒูŠุงุฏุฉ ุงู„ุญุฏ ุฅู„ู‰ 100MB ู„ู€ Hugging Face Spaces
max_file_size = 100 * 1024 * 1024 # 100MB
if file_size > max_file_size:
print(f"โŒ File too large: {file_size / 1024 / 1024:.2f} MB")
return JSONResponse(
status_code=400,
content={
"error": f"File too large. Maximum size is 100MB. Your file: {file_size / 1024 / 1024:.2f} MB",
"success": False,
"file_size_mb": file_size / 1024 / 1024,
"max_size_mb": 100
}
)
# Create temporary file with proper extension
print("๐Ÿ“ Creating temporary file...")
# ุชุญุฏูŠุฏ ุงู…ุชุฏุงุฏ ุงู„ู…ู„ู ุจู†ุงุกู‹ ุนู„ู‰ ู†ูˆุน ุงู„ู…ู„ู ุงู„ุฃุตู„ูŠ
file_extension = '.wav' # default
if file.filename:
original_extension = os.path.splitext(file.filename)[1].lower()
if original_extension in ['.m4a', '.mp3', '.flac', '.ogg', '.webm']:
file_extension = original_extension
elif original_extension in ['.mp4', '.avi', '.mov']:
file_extension = '.mp4'
print(f"๐Ÿ“ Original file: {file.filename}")
print(f"๐Ÿ“ Using extension: {file_extension}")
with tempfile.NamedTemporaryFile(delete=False, suffix=file_extension) as temp_file:
shutil.copyfileobj(file.file, temp_file)
temp_path = temp_file.name
print(f"โœ… Temporary file created: {temp_path}")
# ุชุญุณูŠู† ู…ุนุงู„ุฌุฉ ู…ู„ูุงุช M4A
if file_extension == '.m4a':
print("๐ŸŽต M4A file detected, ensuring proper processing...")
# ูŠู…ูƒู† ุฅุถุงูุฉ ู…ุนุงู„ุฌุฉ ุฎุงุตุฉ ู„ู…ู„ูุงุช M4A ู‡ู†ุง ุฅุฐุง ู„ุฒู… ุงู„ุฃู…ุฑ
# Parse VAD parameters
vad_threshold = 0.5 # default
if vad_filter and vad_parameters:
try:
for param in vad_parameters.split(','):
if '=' in param:
key, value = param.strip().split('=')
if key == 'threshold':
vad_threshold = float(value)
except Exception as e:
print(f"โš ๏ธ Warning: Failed to parse VAD parameters: {e}")
# Transcribe audio with better error handling
print("๐ŸŽค Starting transcription...")
print(f"๐ŸŽฏ Task parameter: {task}")
print(f"๐ŸŒ Language parameter: {language}")
try:
if vad_filter:
print(f"๐Ÿ”Š Using VAD with threshold: {vad_threshold}")
try:
if language:
print(f"๐ŸŒ Using specified language: {language}")
print(f"๐ŸŽฏ Calling model.transcribe with task={task}, language={language}")
segments, info = model.transcribe(
temp_path,
language=language,
task=task,
vad_filter=True,
vad_parameters=f"threshold={vad_threshold}"
)
else:
print("๐ŸŒ Auto-detecting language...")
print(f"๐ŸŽฏ Calling model.transcribe with task={task}, auto language detection")
segments, info = model.transcribe(
temp_path,
task=task,
vad_filter=True,
vad_parameters=f"threshold={vad_threshold}"
)
print(f"โœ… VAD transcription completed successfully")
except Exception as vad_error:
print(f"โš ๏ธ VAD error: {vad_error}")
print(f"๐Ÿ”„ Trying without VAD...")
# Fallback to transcription without VAD
if language:
print(f"๐ŸŒ Using specified language: {language}")
print(f"๐ŸŽฏ Calling model.transcribe with task={task}, language={language} (fallback)")
segments, info = model.transcribe(temp_path, language=language, task=task)
else:
print("๐ŸŒ Auto-detecting language...")
print(f"๐ŸŽฏ Calling model.transcribe with task={task}, auto language detection (fallback)")
segments, info = model.transcribe(temp_path, task=task)
print(f"โœ… Fallback transcription completed")
else:
print("๐ŸŽค Starting transcription without VAD...")
# Transcribe without VAD
if language:
print(f"๐ŸŒ Using specified language: {language}")
print(f"๐ŸŽฏ Calling model.transcribe with task={task}, language={language} (no VAD)")
segments, info = model.transcribe(temp_path, language=language, task=task)
else:
print("๐ŸŒ Auto-detecting language...")
print(f"๐ŸŽฏ Calling model.transcribe with task={task}, auto language detection (no VAD)")
segments, info = model.transcribe(temp_path, task=task)
print(f"โœ… Transcription completed successfully")
except Exception as transcription_error:
print(f"โŒ Transcription failed: {transcription_error}")
# ู…ุญุงูˆู„ุฉ ุซุงู†ูŠุฉ ุจุฏูˆู† ุชุญุฏูŠุฏ ุงู„ู„ุบุฉ
try:
print("๐Ÿ”„ Retrying with auto language detection...")
print(f"๐ŸŽฏ Calling model.transcribe with task={task}, auto language detection (retry)")
segments, info = model.transcribe(temp_path, task=task)
print(f"โœ… Retry successful with auto detection")
except Exception as retry_error:
print(f"โŒ Retry also failed: {retry_error}")
raise transcription_error
# Collect transcription results
# Convert generator to list first
segments_list = list(segments)
transcription = " ".join([seg.text for seg in segments_list])
print(f"๐Ÿ“ Raw transcription result: '{transcription}'")
print(f"๐ŸŒ Detected language: {info.language} (probability: {info.language_probability})")
print(f"๐ŸŽฏ Task used: {task}")
print(f"๐Ÿ“Š Number of segments: {len(segments_list)}")
# ุชุญุณูŠู† ุงู„ู†ุชุงุฆุฌ ู„ู„ุบุฉ ุงู„ุฑูˆุณูŠุฉ
if info.language == 'ru' and transcription:
print("๐Ÿ‡ท๐Ÿ‡บ Russian text detected, applying post-processing...")
# ุฅุฒุงู„ุฉ ุงู„ู…ุณุงูุงุช ุงู„ุฒุงุฆุฏุฉ ูˆุชุญุณูŠู† ุงู„ุชู†ุณูŠู‚
transcription = transcription.strip()
# ุฅุฒุงู„ุฉ ุงู„ู†ู‚ุงุท ุงู„ู…ุชูƒุฑุฑุฉ
transcription = transcription.replace('..', '.')
transcription = transcription.replace('...', '...')
# ุฅุฒุงู„ุฉ ุงู„ู…ุณุงูุงุช ุงู„ู…ุชุนุฏุฏุฉ
transcription = ' '.join(transcription.split())
print(f"๐Ÿ‡ท๐Ÿ‡บ Post-processed Russian text: '{transcription}'")
# ุชุญุณูŠู† ุงู„ู†ุชุงุฆุฌ ู„ู„ุบุฉ ุงู„ุนุฑุจูŠุฉ
elif info.language == 'ar' and transcription:
print("๐Ÿ‡ธ๐Ÿ‡ฆ Arabic text detected, applying post-processing...")
transcription = transcription.strip()
transcription = ' '.join(transcription.split())
print(f"๐Ÿ‡ธ๐Ÿ‡ฆ Post-processed Arabic text: '{transcription}'")
# ุชุญุณูŠู† ุนุงู… ู„ุฌู…ูŠุน ุงู„ู„ุบุงุช
else:
print("๐ŸŒ Applying general post-processing...")
transcription = transcription.strip()
transcription = ' '.join(transcription.split())
print(f"๐ŸŒ Post-processed text: '{transcription}'")
# Clean up temporary file
os.unlink(temp_path)
print(f"๐Ÿงน Temporary file cleaned: {temp_path}")
result = {
"success": True,
"text": transcription,
"language": info.language,
"language_probability": info.language_probability,
"vad_enabled": vad_filter,
"vad_threshold": vad_threshold if vad_filter else None,
"model_used": "large-v3" if "large-v3" in str(model) else "base",
"task_used": task
}
print(f"โœ… Request completed successfully")
print(f"๐Ÿ“ค Returning result with task={task}, language={info.language}")
return result
except Exception as e:
# Clean up temporary file in case of error
if temp_path and os.path.exists(temp_path):
os.unlink(temp_path)
print(f"๐Ÿงน Temporary file cleaned after error: {temp_path}")
error_msg = str(e)
error_type = type(e).__name__
print(f"โŒ Transcription error ({error_type}): {error_msg}")
# ุฅุถุงูุฉ ู…ุนู„ูˆู…ุงุช ุชุดุฎูŠุตูŠุฉ ุฅุถุงููŠุฉ
diagnostic_info = {
"file_size": file_size if 'file_size' in locals() else "unknown",
"file_name": file.filename if file.filename else "unknown",
"file_extension": file_extension if 'file_extension' in locals() else "unknown",
"language_requested": language if 'language' in locals() else "auto",
"task_requested": task if 'task' in locals() else "transcribe",
"vad_enabled": vad_filter if 'vad_filter' in locals() else False,
"model_loaded": model is not None,
"model_type": "large-v3" if model and "large-v3" in str(model) else "base" if model else "none",
"detected_language": info.language if 'info' in locals() else "unknown",
"language_confidence": info.language_probability if 'info' in locals() else "unknown"
}
return JSONResponse(
status_code=500,
content={
"error": error_msg,
"error_type": error_type,
"success": False,
"diagnostic_info": diagnostic_info
}
)
@app.post("/detect-language")
async def detect_language(
file: UploadFile = File(...),
credentials: HTTPAuthorizationCredentials = Depends(verify_token)
):
"""
Detect the language of an audio file
"""
temp_path = None
try:
print(f"๐ŸŒ Starting language detection for file: {file.filename}")
# Check if model is loaded
if model is None:
print("โŒ Model not loaded")
return JSONResponse(
status_code=500,
content={"error": "Model not loaded", "success": False}
)
# Validate file
if not file.filename:
print("โŒ No file provided")
return JSONResponse(
status_code=400,
content={"error": "No file provided", "success": False}
)
# Create temporary file
print("๐Ÿ“ Creating temporary file...")
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as temp_file:
shutil.copyfileobj(file.file, temp_file)
temp_path = temp_file.name
print(f"โœ… Temporary file created: {temp_path}")
# Detect language
print("๐ŸŒ Detecting language...")
segments, info = model.transcribe(temp_path)
# Convert generator to list to avoid any issues
segments_list = list(segments)
print(f"โœ… Language detected: {info.language} (probability: {info.language_probability:.2f})")
# Clean up temporary file
os.unlink(temp_path)
print(f"๐Ÿงน Temporary file cleaned: {temp_path}")
return JSONResponse(content={
"success": True,
"language": info.language,
"language_probability": info.language_probability
})
except Exception as e:
# Clean up temporary file in case of error
if temp_path and os.path.exists(temp_path):
os.unlink(temp_path)
print(f"๐Ÿงน Temporary file cleaned after error: {temp_path}")
error_msg = str(e)
error_type = type(e).__name__
print(f"โŒ Language detection error ({error_type}): {error_msg}")
return JSONResponse(
status_code=500,
content={
"error": error_msg,
"error_type": error_type,
"success": False
}
)
# For Hugging Face Spaces compatibility
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)