Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Depends, WebSocket, WebSocketDisconnect | |
| from fastapi.responses import JSONResponse | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from faster_whisper import WhisperModel | |
| import shutil | |
| import os | |
| import tempfile | |
| import sys | |
| import json | |
| import asyncio | |
| from typing import Optional | |
| # Create FastAPI app | |
| app = FastAPI( | |
| title="Faster Whisper Service", | |
| description="High-performance speech-to-text service using Faster Whisper", | |
| version="1.0.0" | |
| ) | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Security | |
| security = HTTPBearer(auto_error=False) | |
| # Configuration | |
| API_TOKEN = "" | |
| REQUIRE_AUTH = False | |
| # Global model variable | |
| model = None | |
| # WebSocket connection manager | |
| class ConnectionManager: | |
| def __init__(self): | |
| self.active_connections: list[WebSocket] = [] | |
| async def connect(self, websocket: WebSocket): | |
| await websocket.accept() | |
| self.active_connections.append(websocket) | |
| def disconnect(self, websocket: WebSocket): | |
| self.active_connections.remove(websocket) | |
| async def send_personal_message(self, message: str, websocket: WebSocket): | |
| await websocket.send_text(message) | |
| async def broadcast(self, message: str): | |
| for connection in self.active_connections: | |
| try: | |
| await connection.send_text(message) | |
| except: | |
| # Remove disconnected clients | |
| self.active_connections.remove(connection) | |
| manager = ConnectionManager() | |
| def load_model(): | |
| """Load the Whisper model""" | |
| global model | |
| try: | |
| print("๐ Loading Whisper model...") | |
| # ุงุณุชุฎุฏุงู ูู ูุฐุฌ ุฃูุจุฑ ูุฏุนู ุฃูุถู ููุบุงุช ู ุชุนุฏุฏุฉ ุจู ุง ูู ุฐูู ุงูุฑูุณูุฉ | |
| model = WhisperModel("large-v3", compute_type="int8") | |
| print("โ Model loaded successfully") | |
| return True | |
| except Exception as e: | |
| print(f"โ Error loading large model: {e}") | |
| print("๐ Trying with base model as fallback...") | |
| try: | |
| model = WhisperModel("base", compute_type="int8") | |
| print("โ Base model loaded successfully") | |
| return True | |
| except Exception as e2: | |
| print(f"โ Error loading base model: {e2}") | |
| print(f"Python version: {sys.version}") | |
| print(f"Current working directory: {os.getcwd()}") | |
| model = None | |
| return False | |
| def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): | |
| """Verify API token if authentication is required""" | |
| if REQUIRE_AUTH: | |
| if not credentials: | |
| raise HTTPException( | |
| status_code=401, | |
| detail="API token required", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| if credentials.credentials != API_TOKEN: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="Invalid API token", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| return credentials | |
| async def startup_event(): | |
| """Load model on startup""" | |
| load_model() | |
| async def root(): | |
| """Root endpoint""" | |
| return {"message": "Faster Whisper Service is running"} | |
| async def health_check(credentials: HTTPAuthorizationCredentials = Depends(verify_token)): | |
| """Health check endpoint""" | |
| return { | |
| "status": "healthy", | |
| "model_loaded": model is not None, | |
| "service": "faster-whisper", | |
| "auth_required": REQUIRE_AUTH, | |
| "auth_configured": bool(API_TOKEN), | |
| "vad_support": True, | |
| "websocket_support": True, | |
| "python_version": sys.version | |
| } | |
| async def websocket_endpoint(websocket: WebSocket): | |
| """WebSocket endpoint for real-time transcription""" | |
| await manager.connect(websocket) | |
| try: | |
| print("๐ WebSocket connection established") | |
| await manager.send_personal_message( | |
| json.dumps({ | |
| "type": "connection", | |
| "status": "connected", | |
| "message": "WebSocket connection established" | |
| }), | |
| websocket | |
| ) | |
| while True: | |
| try: | |
| # Receive data from client | |
| message = await websocket.receive() | |
| # Handle different message types | |
| if "bytes" in message: | |
| # Binary audio data | |
| data = message["bytes"] | |
| print(f"๐ต WebSocket: Processing audio chunk ({len(data)} bytes)") | |
| # Save audio data to temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as temp_file: | |
| temp_file.write(data) | |
| temp_path = temp_file.name | |
| # Transcribe audio | |
| if model: | |
| segments, info = model.transcribe(temp_path) | |
| # Convert generator to list first | |
| segments_list = list(segments) | |
| transcription = " ".join([seg.text for seg in segments_list]) | |
| # Send transcription result | |
| result = { | |
| "type": "transcription", | |
| "text": transcription, | |
| "language": info.language, | |
| "language_probability": info.language_probability, | |
| "success": True | |
| } | |
| await manager.send_personal_message(json.dumps(result), websocket) | |
| print(f"โ WebSocket: Sent transcription: '{transcription}'") | |
| else: | |
| error_result = { | |
| "type": "error", | |
| "message": "Model not loaded", | |
| "success": False | |
| } | |
| await manager.send_personal_message(json.dumps(error_result), websocket) | |
| # Clean up temporary file | |
| os.unlink(temp_path) | |
| elif "text" in message: | |
| # Text message (JSON configuration) | |
| try: | |
| data = json.loads(message["text"]) | |
| print(f"๐จ WebSocket: Received configuration: {data}") | |
| if data.get("type") == "init": | |
| # Handle initialization | |
| await manager.send_personal_message( | |
| json.dumps({ | |
| "type": "connection", | |
| "status": "initialized", | |
| "message": "Configuration received" | |
| }), | |
| websocket | |
| ) | |
| except json.JSONDecodeError: | |
| print(f"โ ๏ธ WebSocket: Invalid JSON received: {message['text']}") | |
| except Exception as e: | |
| print(f"โ WebSocket processing error: {e}") | |
| error_result = { | |
| "type": "error", | |
| "message": str(e), | |
| "success": False | |
| } | |
| await manager.send_personal_message(json.dumps(error_result), websocket) | |
| except WebSocketDisconnect: | |
| print("๐ WebSocket connection disconnected") | |
| manager.disconnect(websocket) | |
| except Exception as e: | |
| print(f"โ WebSocket error: {e}") | |
| manager.disconnect(websocket) | |
| async def transcribe( | |
| file: UploadFile = File(...), | |
| language: Optional[str] = Form(None), | |
| task: Optional[str] = Form("transcribe"), | |
| vad_filter: Optional[bool] = Form(False), | |
| vad_parameters: Optional[str] = Form("threshold=0.5"), | |
| credentials: HTTPAuthorizationCredentials = Depends(verify_token) | |
| ): | |
| """ | |
| Transcribe audio file to text with optional VAD support | |
| """ | |
| temp_path = None | |
| try: | |
| print(f"๐ต Starting transcription for file: {file.filename}") | |
| # Check if model is loaded | |
| if model is None: | |
| print("โ Model not loaded") | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": "Model not loaded", "success": False} | |
| ) | |
| # Validate file | |
| if not file.filename: | |
| print("โ No file provided") | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "No file provided", "success": False} | |
| ) | |
| # Validate file size (100MB limit for Hugging Face Spaces) | |
| file.file.seek(0, 2) | |
| file_size = file.file.tell() | |
| file.file.seek(0) | |
| print(f"๐ File size: {file_size} bytes ({file_size / 1024 / 1024:.2f} MB)") | |
| # ุฒูุงุฏุฉ ุงูุญุฏ ุฅูู 100MB ูู Hugging Face Spaces | |
| max_file_size = 100 * 1024 * 1024 # 100MB | |
| if file_size > max_file_size: | |
| print(f"โ File too large: {file_size / 1024 / 1024:.2f} MB") | |
| return JSONResponse( | |
| status_code=400, | |
| content={ | |
| "error": f"File too large. Maximum size is 100MB. Your file: {file_size / 1024 / 1024:.2f} MB", | |
| "success": False, | |
| "file_size_mb": file_size / 1024 / 1024, | |
| "max_size_mb": 100 | |
| } | |
| ) | |
| # Create temporary file with proper extension | |
| print("๐ Creating temporary file...") | |
| # ุชุญุฏูุฏ ุงู ุชุฏุงุฏ ุงูู ูู ุจูุงุกู ุนูู ููุน ุงูู ูู ุงูุฃุตูู | |
| file_extension = '.wav' # default | |
| if file.filename: | |
| original_extension = os.path.splitext(file.filename)[1].lower() | |
| if original_extension in ['.m4a', '.mp3', '.flac', '.ogg', '.webm']: | |
| file_extension = original_extension | |
| elif original_extension in ['.mp4', '.avi', '.mov']: | |
| file_extension = '.mp4' | |
| print(f"๐ Original file: {file.filename}") | |
| print(f"๐ Using extension: {file_extension}") | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=file_extension) as temp_file: | |
| shutil.copyfileobj(file.file, temp_file) | |
| temp_path = temp_file.name | |
| print(f"โ Temporary file created: {temp_path}") | |
| # ุชุญุณูู ู ุนุงูุฌุฉ ู ููุงุช M4A | |
| if file_extension == '.m4a': | |
| print("๐ต M4A file detected, ensuring proper processing...") | |
| # ูู ูู ุฅุถุงูุฉ ู ุนุงูุฌุฉ ุฎุงุตุฉ ูู ููุงุช M4A ููุง ุฅุฐุง ูุฒู ุงูุฃู ุฑ | |
| # Parse VAD parameters | |
| vad_threshold = 0.5 # default | |
| if vad_filter and vad_parameters: | |
| try: | |
| for param in vad_parameters.split(','): | |
| if '=' in param: | |
| key, value = param.strip().split('=') | |
| if key == 'threshold': | |
| vad_threshold = float(value) | |
| except Exception as e: | |
| print(f"โ ๏ธ Warning: Failed to parse VAD parameters: {e}") | |
| # Transcribe audio with better error handling | |
| print("๐ค Starting transcription...") | |
| print(f"๐ฏ Task parameter: {task}") | |
| print(f"๐ Language parameter: {language}") | |
| try: | |
| if vad_filter: | |
| print(f"๐ Using VAD with threshold: {vad_threshold}") | |
| try: | |
| if language: | |
| print(f"๐ Using specified language: {language}") | |
| print(f"๐ฏ Calling model.transcribe with task={task}, language={language}") | |
| segments, info = model.transcribe( | |
| temp_path, | |
| language=language, | |
| task=task, | |
| vad_filter=True, | |
| vad_parameters=f"threshold={vad_threshold}" | |
| ) | |
| else: | |
| print("๐ Auto-detecting language...") | |
| print(f"๐ฏ Calling model.transcribe with task={task}, auto language detection") | |
| segments, info = model.transcribe( | |
| temp_path, | |
| task=task, | |
| vad_filter=True, | |
| vad_parameters=f"threshold={vad_threshold}" | |
| ) | |
| print(f"โ VAD transcription completed successfully") | |
| except Exception as vad_error: | |
| print(f"โ ๏ธ VAD error: {vad_error}") | |
| print(f"๐ Trying without VAD...") | |
| # Fallback to transcription without VAD | |
| if language: | |
| print(f"๐ Using specified language: {language}") | |
| print(f"๐ฏ Calling model.transcribe with task={task}, language={language} (fallback)") | |
| segments, info = model.transcribe(temp_path, language=language, task=task) | |
| else: | |
| print("๐ Auto-detecting language...") | |
| print(f"๐ฏ Calling model.transcribe with task={task}, auto language detection (fallback)") | |
| segments, info = model.transcribe(temp_path, task=task) | |
| print(f"โ Fallback transcription completed") | |
| else: | |
| print("๐ค Starting transcription without VAD...") | |
| # Transcribe without VAD | |
| if language: | |
| print(f"๐ Using specified language: {language}") | |
| print(f"๐ฏ Calling model.transcribe with task={task}, language={language} (no VAD)") | |
| segments, info = model.transcribe(temp_path, language=language, task=task) | |
| else: | |
| print("๐ Auto-detecting language...") | |
| print(f"๐ฏ Calling model.transcribe with task={task}, auto language detection (no VAD)") | |
| segments, info = model.transcribe(temp_path, task=task) | |
| print(f"โ Transcription completed successfully") | |
| except Exception as transcription_error: | |
| print(f"โ Transcription failed: {transcription_error}") | |
| # ู ุญุงููุฉ ุซุงููุฉ ุจุฏูู ุชุญุฏูุฏ ุงููุบุฉ | |
| try: | |
| print("๐ Retrying with auto language detection...") | |
| print(f"๐ฏ Calling model.transcribe with task={task}, auto language detection (retry)") | |
| segments, info = model.transcribe(temp_path, task=task) | |
| print(f"โ Retry successful with auto detection") | |
| except Exception as retry_error: | |
| print(f"โ Retry also failed: {retry_error}") | |
| raise transcription_error | |
| # Collect transcription results | |
| # Convert generator to list first | |
| segments_list = list(segments) | |
| transcription = " ".join([seg.text for seg in segments_list]) | |
| print(f"๐ Raw transcription result: '{transcription}'") | |
| print(f"๐ Detected language: {info.language} (probability: {info.language_probability})") | |
| print(f"๐ฏ Task used: {task}") | |
| print(f"๐ Number of segments: {len(segments_list)}") | |
| # ุชุญุณูู ุงููุชุงุฆุฌ ููุบุฉ ุงูุฑูุณูุฉ | |
| if info.language == 'ru' and transcription: | |
| print("๐ท๐บ Russian text detected, applying post-processing...") | |
| # ุฅุฒุงูุฉ ุงูู ุณุงูุงุช ุงูุฒุงุฆุฏุฉ ูุชุญุณูู ุงูุชูุณูู | |
| transcription = transcription.strip() | |
| # ุฅุฒุงูุฉ ุงูููุงุท ุงูู ุชูุฑุฑุฉ | |
| transcription = transcription.replace('..', '.') | |
| transcription = transcription.replace('...', '...') | |
| # ุฅุฒุงูุฉ ุงูู ุณุงูุงุช ุงูู ุชุนุฏุฏุฉ | |
| transcription = ' '.join(transcription.split()) | |
| print(f"๐ท๐บ Post-processed Russian text: '{transcription}'") | |
| # ุชุญุณูู ุงููุชุงุฆุฌ ููุบุฉ ุงูุนุฑุจูุฉ | |
| elif info.language == 'ar' and transcription: | |
| print("๐ธ๐ฆ Arabic text detected, applying post-processing...") | |
| transcription = transcription.strip() | |
| transcription = ' '.join(transcription.split()) | |
| print(f"๐ธ๐ฆ Post-processed Arabic text: '{transcription}'") | |
| # ุชุญุณูู ุนุงู ูุฌู ูุน ุงููุบุงุช | |
| else: | |
| print("๐ Applying general post-processing...") | |
| transcription = transcription.strip() | |
| transcription = ' '.join(transcription.split()) | |
| print(f"๐ Post-processed text: '{transcription}'") | |
| # Clean up temporary file | |
| os.unlink(temp_path) | |
| print(f"๐งน Temporary file cleaned: {temp_path}") | |
| result = { | |
| "success": True, | |
| "text": transcription, | |
| "language": info.language, | |
| "language_probability": info.language_probability, | |
| "vad_enabled": vad_filter, | |
| "vad_threshold": vad_threshold if vad_filter else None, | |
| "model_used": "large-v3" if "large-v3" in str(model) else "base", | |
| "task_used": task | |
| } | |
| print(f"โ Request completed successfully") | |
| print(f"๐ค Returning result with task={task}, language={info.language}") | |
| return result | |
| except Exception as e: | |
| # Clean up temporary file in case of error | |
| if temp_path and os.path.exists(temp_path): | |
| os.unlink(temp_path) | |
| print(f"๐งน Temporary file cleaned after error: {temp_path}") | |
| error_msg = str(e) | |
| error_type = type(e).__name__ | |
| print(f"โ Transcription error ({error_type}): {error_msg}") | |
| # ุฅุถุงูุฉ ู ุนููู ุงุช ุชุดุฎูุตูุฉ ุฅุถุงููุฉ | |
| diagnostic_info = { | |
| "file_size": file_size if 'file_size' in locals() else "unknown", | |
| "file_name": file.filename if file.filename else "unknown", | |
| "file_extension": file_extension if 'file_extension' in locals() else "unknown", | |
| "language_requested": language if 'language' in locals() else "auto", | |
| "task_requested": task if 'task' in locals() else "transcribe", | |
| "vad_enabled": vad_filter if 'vad_filter' in locals() else False, | |
| "model_loaded": model is not None, | |
| "model_type": "large-v3" if model and "large-v3" in str(model) else "base" if model else "none", | |
| "detected_language": info.language if 'info' in locals() else "unknown", | |
| "language_confidence": info.language_probability if 'info' in locals() else "unknown" | |
| } | |
| return JSONResponse( | |
| status_code=500, | |
| content={ | |
| "error": error_msg, | |
| "error_type": error_type, | |
| "success": False, | |
| "diagnostic_info": diagnostic_info | |
| } | |
| ) | |
| async def detect_language( | |
| file: UploadFile = File(...), | |
| credentials: HTTPAuthorizationCredentials = Depends(verify_token) | |
| ): | |
| """ | |
| Detect the language of an audio file | |
| """ | |
| temp_path = None | |
| try: | |
| print(f"๐ Starting language detection for file: {file.filename}") | |
| # Check if model is loaded | |
| if model is None: | |
| print("โ Model not loaded") | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": "Model not loaded", "success": False} | |
| ) | |
| # Validate file | |
| if not file.filename: | |
| print("โ No file provided") | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "No file provided", "success": False} | |
| ) | |
| # Create temporary file | |
| print("๐ Creating temporary file...") | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as temp_file: | |
| shutil.copyfileobj(file.file, temp_file) | |
| temp_path = temp_file.name | |
| print(f"โ Temporary file created: {temp_path}") | |
| # Detect language | |
| print("๐ Detecting language...") | |
| segments, info = model.transcribe(temp_path) | |
| # Convert generator to list to avoid any issues | |
| segments_list = list(segments) | |
| print(f"โ Language detected: {info.language} (probability: {info.language_probability:.2f})") | |
| # Clean up temporary file | |
| os.unlink(temp_path) | |
| print(f"๐งน Temporary file cleaned: {temp_path}") | |
| return JSONResponse(content={ | |
| "success": True, | |
| "language": info.language, | |
| "language_probability": info.language_probability | |
| }) | |
| except Exception as e: | |
| # Clean up temporary file in case of error | |
| if temp_path and os.path.exists(temp_path): | |
| os.unlink(temp_path) | |
| print(f"๐งน Temporary file cleaned after error: {temp_path}") | |
| error_msg = str(e) | |
| error_type = type(e).__name__ | |
| print(f"โ Language detection error ({error_type}): {error_msg}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={ | |
| "error": error_msg, | |
| "error_type": error_type, | |
| "success": False | |
| } | |
| ) | |
| # For Hugging Face Spaces compatibility | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |