Spaces:
Build error
Build error
| import os | |
| import tempfile | |
| import subprocess | |
| from fastapi import FastAPI, UploadFile, File | |
| import whisper | |
| from transformers import pipeline, AutoModelForSpeechSeq2Seq, AutoProcessor | |
| import torch | |
| from datetime import timedelta | |
| from deep_translator import GoogleTranslator | |
| import ffmpeg | |
| # Initialize FastAPI app | |
| app = FastAPI() | |
| def format_time(seconds): | |
| # Convert seconds to SRT format (00:00:00,000) | |
| td = timedelta(seconds=seconds) | |
| hours, remainder = divmod(td.seconds, 3600) | |
| minutes, seconds = divmod(remainder, 60) | |
| milliseconds = td.microseconds // 1000 | |
| return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" | |
| def extract_audio(video_path): | |
| # Extract audio from video using ffmpeg | |
| temp_dir = tempfile.gettempdir() | |
| audio_path = os.path.join(temp_dir, "extracted_audio.wav") | |
| # Use ffmpeg to extract audio | |
| ffmpeg.input(video_path).output(audio_path, format='wav').run() | |
| return audio_path | |
| def transcribe_audio(audio_path): | |
| # Transcribe audio to text using Whisper model | |
| try: | |
| # Load the Whisper model | |
| model = whisper.load_model("base") # Load the Whisper model | |
| result = model.transcribe(audio_path) | |
| return result["segments"] | |
| except Exception as e: | |
| print(f"Error using whisper model: {e}") | |
| return [] | |
| def translate_text(text): | |
| # Translate text from English to Arabic | |
| translator = GoogleTranslator(source='en', target='ar') | |
| return translator.translate(text) | |
| def create_srt(segments, output_path): | |
| # Create an SRT file from translated segments ensuring proper encoding | |
| with open(output_path, 'w', encoding='utf-8-sig') as srt_file: # UTF-8 with BOM for compatibility | |
| for i, segment in enumerate(segments, start=1): | |
| if hasattr(segment, 'get'): # Handle variations in output models | |
| start_time = segment.get('start', 0) | |
| end_time = segment.get('end', 0) | |
| text = segment.get('text', '') | |
| translation = segment.get('translation', '') | |
| else: | |
| start_time = segment.start | |
| end_time = segment.end | |
| text = segment.text | |
| translation = getattr(segment, 'translation', text) # Use the original text if no translation | |
| # Fixed the string formatting error | |
| srt_file.write(f"{i}\n") | |
| srt_file.write(f"{format_time(start_time)} --> {format_time(end_time)}\n") | |
| srt_file.write(f"{translation}\n\n") | |
| def burn_subtitles(video_path, srt_path, output_path): | |
| # Burn subtitles into video using FFmpeg with Arabic support | |
| font_path = "/usr/share/fonts/truetype/Amiri-Regular.ttf" # Path to Amiri font | |
| cmd = [ | |
| 'ffmpeg', '-y', | |
| '-i', video_path, | |
| '-vf', f"subtitles='{srt_path}':force_style='FontName={font_path},FontSize=24,PrimaryColour=&HFFFFFF,OutlineColour=&H000000,BorderStyle=3,Alignment=2,Encoding=1'", | |
| '-sub_charenc', 'UTF-8', | |
| '-c:v', 'libx264', '-crf', '18', | |
| '-c:a', 'copy', | |
| output_path | |
| ] | |
| try: | |
| subprocess.run(cmd, check=True) | |
| return output_path | |
| except subprocess.CalledProcessError as e: | |
| print(f"FFmpeg error: {e}") | |
| return None | |
| def process_video(video_path): | |
| # Process the video: extract audio, transcribe, translate, create SRT, burn subtitles | |
| temp_dir = tempfile.gettempdir() | |
| file_name = os.path.splitext(os.path.basename(video_path))[0] | |
| audio_path = extract_audio(video_path) | |
| segments = transcribe_audio(audio_path) | |
| translated_segments = [] | |
| for i, segment in enumerate(segments): | |
| text = segment.text if hasattr(segment, 'text') else segment.get('text', '') | |
| translated_text = translate_text(text) | |
| segment.translation = translated_text | |
| translated_segments.append(segment) | |
| srt_path = os.path.join(temp_dir, f"{file_name}.srt") | |
| create_srt(translated_segments, srt_path) | |
| output_path = os.path.join(temp_dir, f"{file_name}_translated.mp4") | |
| result_path = burn_subtitles(video_path, srt_path, output_path) | |
| return result_path, srt_path | |
| # API endpoint to process video | |
| async def process_video_endpoint(file: UploadFile = File(...)): | |
| # API to process video and generate translated subtitles | |
| temp_dir = tempfile.gettempdir() | |
| file_path = os.path.join(temp_dir, file.filename) | |
| with open(file_path, "wb") as f: | |
| f.write(await file.read()) | |
| result_path, srt_path = process_video(file_path) | |
| return {"video_url": result_path, "srt_url": srt_path} |