from faster_whisper import WhisperModel, BatchedInferencePipeline import time import os import shutil import yt_dlp import subprocess from typing import Optional import logging from fastapi import FastAPI, File, UploadFile, HTTPException, Form from fastapi.responses import FileResponse import os, time from fastapi.middleware.cors import CORSMiddleware from pathlib import Path import zipfile import tempfile app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["*"], ) logging.basicConfig() logging.getLogger("faster_whisper").setLevel(logging.DEBUG) def youtube_download_video(VIDEO_URL, DOWNLOAD_DIR, output_template): URLS = [VIDEO_URL] os.makedirs(DOWNLOAD_DIR, exist_ok=True) ydl_opts = { 'outtmpl': output_template, 'format': 'bestvideo[height<=1080]+bestaudio/best', 'merge_output_format': 'mp4', 'verbose': True } with yt_dlp.YoutubeDL(ydl_opts) as ydl: try: print(f"Downloading from YouTube: {URLS[0]}") info = ydl.extract_info(URLS[0], download=True) if not info: return "Error downloading youtube video" final_filepath = None if 'requested_downloads' in info and info['requested_downloads']: final_filepath = info['requested_downloads'][0]['filepath'] elif '_filename' in info: final_filepath = info['_filename'] else: print("Warning: yt-dlp did not provide a clear filepath. Attempting to construct.") if 'title' in info and 'ext' in info: guessed_filename = f"{info['title']}.{info['ext']}" guessed_path = os.path.join(DOWNLOAD_DIR, guessed_filename) if os.path.exists(guessed_path): final_filepath = guessed_path else: print(f"Could not determine downloaded file path for {URLS[0]}.") except Exception as e: print(f"An error occurred during YouTube download: {e}") final_filepath = None finally: return final_filepath def local_audio_file(DOWNLOAD_DIR, AUDIO_FILE): try: potential_path = os.path.join(DOWNLOAD_DIR, AUDIO_FILE) if os.path.exists(potential_path): final_filepath = potential_path print(f"Using local file: {final_filepath}") elif os.path.exists(AUDIO_FILE): final_filepath = AUDIO_FILE print(f"Using local file: {final_filepath}") else: print(f"Local file not found at '{potential_path}' or as '{AUDIO_FILE}'") final_filepath = None except Exception as e: final_path = None print(f"Error finding file:{e}") finally: return final_filepath def create_subtitle_chunks(segments, max_words=8, max_duration=5.0): subtitle_chunks = [] for segment in segments: if hasattr(segment, 'words') and segment.words: current_chunk = [] chunk_start = segment.words[0].start for i, word in enumerate(segment.words): current_chunk.append(word.word) if (len(current_chunk) >= max_words or word.end - chunk_start >= max_duration): text = ''.join(current_chunk).strip() subtitle_chunks.append({ 'start': chunk_start, 'end': word.end, 'text': text }) current_chunk = [] if i + 1 < len(segment.words): chunk_start = segment.words[i + 1].start if current_chunk: text = ''.join(current_chunk).strip() subtitle_chunks.append({ 'start': chunk_start, 'end': segment.words[-1].end, 'text': text }) else: subtitle_chunks.append({ 'start': segment.start, 'end': segment.end, 'text': segment.text }) return subtitle_chunks def format_time(seconds): seconds -= 0.2 hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds_remainder = seconds % 60 milliseconds = int((seconds_remainder - int(seconds_remainder)) * 1000) return f"{hours:02d}:{minutes:02d}:{int(seconds_remainder):02d},{milliseconds:03d}" def add_subtitles(media_path): base, ext = os.path.splitext(os.path.basename(media_path)) dir_path = os.path.dirname(media_path) final_output = os.path.join(dir_path, f"{base}_subtitled.mp4") subtitle_file = os.path.join(dir_path, f"{base}.srt") if not os.path.exists(subtitle_file): print(f"Error: Subtitle file not found at {subtitle_file}") return video_formats = ['.mp4', '.webm', '.mpeg'] try: if ext.lower() in video_formats: print('Found video file.') temp_output = os.path.join(dir_path, f"{base}_temp.mp4") cmd = ['ffmpeg', '-i', media_path, '-i', subtitle_file, '-c', 'copy', '-c:s', 'mov_text', temp_output, '-y'] subprocess.run(cmd, check=True, capture_output=True) if ext.lower() == ".mp4": os.remove(media_path) os.rename(temp_output, media_path) else: os.rename(temp_output, final_output) else: print('Found audio file.') temp_video = os.path.join(dir_path, f"{base}_temp.mp4") cmd1 = ['ffmpeg', '-f', 'lavfi', '-i', 'color=c=black:s=1280x720:r=5', '-i', media_path, '-c:a', 'copy', '-shortest', temp_video, '-y'] subprocess.run(cmd1, check=True, capture_output=True) cmd2 = ['ffmpeg', '-i', temp_video, '-i', subtitle_file, '-c', 'copy', '-c:s', 'mov_text', final_output, '-y'] subprocess.run(cmd2, check=True, capture_output=True) os.remove(temp_video) return final_output except subprocess.CalledProcessError as e: print(f"FFmpeg Error: {e.stderr.decode()}") except Exception as e: print(f"An error occurred: {e}") def clean_files(path): if os.path.isdir(path): shutil.rmtree(path) print("Log: Cleaned all files") @app.get('/test') async def test_endpoint(): return {"message": "FastAPI is working!"} @app.post('/generate-subtitles') async def generate_subtitles( file: Optional[UploadFile] = File(None), youtube_url: Optional[str] = Form(None) ): upload_dir = '/tmp/audio' os.makedirs(upload_dir, exist_ok=True) if file: file_path = os.path.join(upload_dir, file.filename) with open(file_path, "wb") as f: f.write(await file.read()) final_filepath = file_path print(f"Uploaded file saved to {final_filepath}") elif youtube_url: output_template = os.path.join(upload_dir, "%(title)s.%(ext)s") final_filepath = youtube_download_video(youtube_url, upload_dir, output_template) else: raise HTTPException(status_code=400, detail="You must provide either a file or youtube URL.") if final_filepath and os.path.exists(final_filepath): print(f"Processing audio file: {final_filepath}") print(f"File size: {os.path.getsize(final_filepath) / 1024 / 1024:.2f} MB") base_name = os.path.basename(final_filepath) file_name_without_extension, _ = os.path.splitext(base_name) FILE_NAME_FOR_TXT = file_name_without_extension model_size = "small" print(f"\nLoading Whisper model: {model_size}...") try: model = WhisperModel( model_size, device="cpu", compute_type="int8", download_root="/app/models" ) batched_model = BatchedInferencePipeline(model=model) print("Model loaded successfully.") print("\nStarting transcription...") start_time = time.time() segments, info = batched_model.transcribe( final_filepath, batch_size=8, beam_size=5, word_timestamps=True ) os.makedirs(upload_dir, exist_ok=True) transcript_filename = os.path.join(upload_dir, f"{FILE_NAME_FOR_TXT}.srt") subtitle_chunks = create_subtitle_chunks(segments, max_words=12, max_duration=4.0) full_transcript_text = [] for chunk in subtitle_chunks: start_time_formatted = format_time(chunk['start']) end_time_formatted = format_time(chunk['end']) line = f"{start_time_formatted} --> {end_time_formatted}\n{chunk['text']}" full_transcript_text.append(line) with open(transcript_filename, "w", encoding="utf-8") as f: count = 1 for line in full_transcript_text: f.write(f"{count}\n{line}\n\n") count += 1 end_time = time.time() processed_time = end_time - start_time print(f"\nTranscription complete and saved to {transcript_filename}.") print(f"Processed in {processed_time:.2f} seconds") video_output = Path(final_filepath).resolve() subtitle_output = Path(transcript_filename).resolve() files_to_send = [video_output, subtitle_output] with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp: with zipfile.ZipFile(tmp, "w", zipfile.ZIP_DEFLATED) as zf: for f in files_to_send: zf.write(f, arcname=f.name) tmp_path = tmp.name return FileResponse(tmp_path, media_type="application/zip", filename="subtitles.zip") except Exception as e: raise HTTPException(status_code=400, detail=str(e)) finally: if 'model' in locals(): del model if 'batched_model' in locals(): del batched_model print("Model resources released.") clean_files(upload_dir) import gc gc.collect() else: raise HTTPException(status_code=400, detail="Failed to process the file.")