import os import tempfile import subprocess from fastapi import FastAPI, UploadFile, File import whisper from transformers import pipeline, AutoModelForSpeechSeq2Seq, AutoProcessor import torch from datetime import timedelta from deep_translator import GoogleTranslator import ffmpeg # Initialize FastAPI app app = FastAPI() def format_time(seconds): # Convert seconds to SRT format (00:00:00,000) td = timedelta(seconds=seconds) hours, remainder = divmod(td.seconds, 3600) minutes, seconds = divmod(remainder, 60) milliseconds = td.microseconds // 1000 return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" def extract_audio(video_path): # Extract audio from video using ffmpeg temp_dir = tempfile.gettempdir() audio_path = os.path.join(temp_dir, "extracted_audio.wav") # Use ffmpeg to extract audio ffmpeg.input(video_path).output(audio_path, format='wav').run() return audio_path def transcribe_audio(audio_path): # Transcribe audio to text using Whisper model try: # Load the Whisper model model = whisper.load_model("base") # Load the Whisper model result = model.transcribe(audio_path) return result["segments"] except Exception as e: print(f"Error using whisper model: {e}") return [] def translate_text(text): # Translate text from English to Arabic translator = GoogleTranslator(source='en', target='ar') return translator.translate(text) def create_srt(segments, output_path): # Create an SRT file from translated segments ensuring proper encoding with open(output_path, 'w', encoding='utf-8-sig') as srt_file: # UTF-8 with BOM for compatibility for i, segment in enumerate(segments, start=1): if hasattr(segment, 'get'): # Handle variations in output models start_time = segment.get('start', 0) end_time = segment.get('end', 0) text = segment.get('text', '') translation = segment.get('translation', '') else: start_time = segment.start end_time = segment.end text = segment.text translation = getattr(segment, 'translation', text) # Use the original text if no translation # Fixed the string formatting error srt_file.write(f"{i}\n") srt_file.write(f"{format_time(start_time)} --> {format_time(end_time)}\n") srt_file.write(f"{translation}\n\n") def burn_subtitles(video_path, srt_path, output_path): # Burn subtitles into video using FFmpeg with Arabic support font_path = "/usr/share/fonts/truetype/Amiri-Regular.ttf" # Path to Amiri font cmd = [ 'ffmpeg', '-y', '-i', video_path, '-vf', f"subtitles='{srt_path}':force_style='FontName={font_path},FontSize=24,PrimaryColour=&HFFFFFF,OutlineColour=&H000000,BorderStyle=3,Alignment=2,Encoding=1'", '-sub_charenc', 'UTF-8', '-c:v', 'libx264', '-crf', '18', '-c:a', 'copy', output_path ] try: subprocess.run(cmd, check=True) return output_path except subprocess.CalledProcessError as e: print(f"FFmpeg error: {e}") return None def process_video(video_path): # Process the video: extract audio, transcribe, translate, create SRT, burn subtitles temp_dir = tempfile.gettempdir() file_name = os.path.splitext(os.path.basename(video_path))[0] audio_path = extract_audio(video_path) segments = transcribe_audio(audio_path) translated_segments = [] for i, segment in enumerate(segments): text = segment.text if hasattr(segment, 'text') else segment.get('text', '') translated_text = translate_text(text) segment.translation = translated_text translated_segments.append(segment) srt_path = os.path.join(temp_dir, f"{file_name}.srt") create_srt(translated_segments, srt_path) output_path = os.path.join(temp_dir, f"{file_name}_translated.mp4") result_path = burn_subtitles(video_path, srt_path, output_path) return result_path, srt_path # API endpoint to process video @app.post("/process_video/") async def process_video_endpoint(file: UploadFile = File(...)): # API to process video and generate translated subtitles temp_dir = tempfile.gettempdir() file_path = os.path.join(temp_dir, file.filename) with open(file_path, "wb") as f: f.write(await file.read()) result_path, srt_path = process_video(file_path) return {"video_url": result_path, "srt_url": srt_path}