Spaces:
Runtime error
Runtime error
| # app.py | |
| import time | |
| import whisper | |
| from fastapi import FastAPI, UploadFile, File, HTTPException | |
| from fastapi.responses import FileResponse | |
| from typing import Optional | |
| import os | |
| import psutil | |
| app = FastAPI() | |
| start_time = time.time() | |
| MODEL_CACHE_DIR = "./model_cache" # Use writable directory | |
| # Create cache directory if not exists | |
| os.makedirs(MODEL_CACHE_DIR, exist_ok=True) | |
| # Load model during startup | |
| def load_model(): | |
| try: | |
| # Load model directly into memory with explicit cache dir | |
| app.state.model = whisper.load_model( | |
| "large", | |
| download_root=MODEL_CACHE_DIR, | |
| in_memory=True # Force in-memory loading | |
| ) | |
| print(f"Model loaded successfully on device: {app.state.model.device}") | |
| except Exception as e: | |
| print(f"Error loading model: {str(e)}") | |
| raise RuntimeError("Failed to load Whisper model") | |
| def format_time(seconds: float) -> str: | |
| """Convert seconds to SRT time format""" | |
| milliseconds = int((seconds - int(seconds)) * 1000) | |
| hours = int(seconds // 3600) | |
| minutes = int((seconds % 3600) // 60) | |
| seconds = int(seconds % 60) | |
| return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" | |
| def generate_srt(transcript: dict) -> str: | |
| """Generate SRT content from Whisper transcript""" | |
| srt_content = [] | |
| index = 1 | |
| for segment in transcript['segments']: | |
| for word in segment.get('words', []): | |
| start = word['start'] | |
| end = word['end'] | |
| start_time = format_time(start) | |
| end_time = format_time(end) | |
| srt_content.append( | |
| f"{index}\n" | |
| f"{start_time} --> {end_time}\n" | |
| f"{word['word'].strip()}\n\n" | |
| ) | |
| index += 1 | |
| return "".join(srt_content) | |
| async def transcribe_audio( | |
| file: UploadFile = File(..., description="Audio/video file to transcribe"), | |
| task_token: Optional[str] = None | |
| ): | |
| """Endpoint for submitting transcription tasks""" | |
| try: | |
| # Save uploaded file temporarily | |
| temp_file = f"temp_{file.filename}" | |
| with open(temp_file, "wb") as buffer: | |
| content = await file.read() | |
| buffer.write(content) | |
| # Transcribe audio | |
| result = app.state.model.transcribe( | |
| temp_file, | |
| word_timestamps=True | |
| ) | |
| # Generate SRT file | |
| srt_content = generate_srt(result) | |
| srt_file = f"{temp_file}.srt" | |
| with open(srt_file, "w") as f: | |
| f.write(srt_content) | |
| # Clean up temporary files | |
| os.remove(temp_file) | |
| return FileResponse( | |
| srt_file, | |
| media_type='application/x-subrip', | |
| filename=f"{file.filename}.srt" | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| if os.path.exists(temp_file): | |
| os.remove(temp_file) | |
| if os.path.exists(srt_file): | |
| os.remove(srt_file) | |
| async def get_status(): | |
| """Get server health status""" | |
| process = psutil.Process(os.getpid()) | |
| return { | |
| "status": "OK", | |
| "uptime": round(time.time() - start_time, 2), | |
| "memory_usage": f"{process.memory_info().rss / 1024 / 1024:.2f} MB", | |
| "model_loaded": hasattr(app.state, "model"), | |
| "active_requests": len(process.connections()) | |
| } | |
| async def get_model_status(): | |
| """Get model information""" | |
| if not hasattr(app.state, "model"): | |
| return {"model_status": "Not loaded"} | |
| return { | |
| "model_name": "Whisper large", | |
| "device": app.state.model.device, | |
| "parameters": f"{sum(p.numel() for p in app.state.model.parameters()):,}" | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |