Mohamed7733's picture
Update app.py
ae52466 verified
import os
import tempfile
import subprocess
from fastapi import FastAPI, UploadFile, File
import whisper
from transformers import pipeline, AutoModelForSpeechSeq2Seq, AutoProcessor
import torch
from datetime import timedelta
from deep_translator import GoogleTranslator
import ffmpeg
# Initialize FastAPI app
app = FastAPI()
def format_time(seconds):
# Convert seconds to SRT format (00:00:00,000)
td = timedelta(seconds=seconds)
hours, remainder = divmod(td.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
milliseconds = td.microseconds // 1000
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}"
def extract_audio(video_path):
# Extract audio from video using ffmpeg
temp_dir = tempfile.gettempdir()
audio_path = os.path.join(temp_dir, "extracted_audio.wav")
# Use ffmpeg to extract audio
ffmpeg.input(video_path).output(audio_path, format='wav').run()
return audio_path
def transcribe_audio(audio_path):
# Transcribe audio to text using Whisper model
try:
# Load the Whisper model
model = whisper.load_model("base") # Load the Whisper model
result = model.transcribe(audio_path)
return result["segments"]
except Exception as e:
print(f"Error using whisper model: {e}")
return []
def translate_text(text):
# Translate text from English to Arabic
translator = GoogleTranslator(source='en', target='ar')
return translator.translate(text)
def create_srt(segments, output_path):
# Create an SRT file from translated segments ensuring proper encoding
with open(output_path, 'w', encoding='utf-8-sig') as srt_file: # UTF-8 with BOM for compatibility
for i, segment in enumerate(segments, start=1):
if hasattr(segment, 'get'): # Handle variations in output models
start_time = segment.get('start', 0)
end_time = segment.get('end', 0)
text = segment.get('text', '')
translation = segment.get('translation', '')
else:
start_time = segment.start
end_time = segment.end
text = segment.text
translation = getattr(segment, 'translation', text) # Use the original text if no translation
# Fixed the string formatting error
srt_file.write(f"{i}\n")
srt_file.write(f"{format_time(start_time)} --> {format_time(end_time)}\n")
srt_file.write(f"{translation}\n\n")
def burn_subtitles(video_path, srt_path, output_path):
# Burn subtitles into video using FFmpeg with Arabic support
font_path = "/usr/share/fonts/truetype/Amiri-Regular.ttf" # Path to Amiri font
cmd = [
'ffmpeg', '-y',
'-i', video_path,
'-vf', f"subtitles='{srt_path}':force_style='FontName={font_path},FontSize=24,PrimaryColour=&HFFFFFF,OutlineColour=&H000000,BorderStyle=3,Alignment=2,Encoding=1'",
'-sub_charenc', 'UTF-8',
'-c:v', 'libx264', '-crf', '18',
'-c:a', 'copy',
output_path
]
try:
subprocess.run(cmd, check=True)
return output_path
except subprocess.CalledProcessError as e:
print(f"FFmpeg error: {e}")
return None
def process_video(video_path):
# Process the video: extract audio, transcribe, translate, create SRT, burn subtitles
temp_dir = tempfile.gettempdir()
file_name = os.path.splitext(os.path.basename(video_path))[0]
audio_path = extract_audio(video_path)
segments = transcribe_audio(audio_path)
translated_segments = []
for i, segment in enumerate(segments):
text = segment.text if hasattr(segment, 'text') else segment.get('text', '')
translated_text = translate_text(text)
segment.translation = translated_text
translated_segments.append(segment)
srt_path = os.path.join(temp_dir, f"{file_name}.srt")
create_srt(translated_segments, srt_path)
output_path = os.path.join(temp_dir, f"{file_name}_translated.mp4")
result_path = burn_subtitles(video_path, srt_path, output_path)
return result_path, srt_path
# API endpoint to process video
@app.post("/process_video/")
async def process_video_endpoint(file: UploadFile = File(...)):
# API to process video and generate translated subtitles
temp_dir = tempfile.gettempdir()
file_path = os.path.join(temp_dir, file.filename)
with open(file_path, "wb") as f:
f.write(await file.read())
result_path, srt_path = process_video(file_path)
return {"video_url": result_path, "srt_url": srt_path}