myTools / src /subtitle_extractor.py
danzapp70's picture
Deploy version v1.2.0
3c11817 verified
"""Utility functions for extracting audio, transcribing and merging subtitles."""
from __future__ import annotations
import logging
import os
import subprocess
from dataclasses import dataclass
from typing import List, Optional
from pydub import AudioSegment
# MoviePy is an optional dependency used when extracting audio. It is imported
# lazily to avoid issues when running in environments where it is not
# available (for instance during unit tests).
try:
from faster_whisper import WhisperModel
except ImportError: # pragma: no cover - optional dependency
WhisperModel = None
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
MAX_OPENAI_AUDIO_SIZE = 25 * 1024 * 1024 # 25 MB
def format_timestamp(seconds: float) -> str:
"""Return timestamp in SRT format."""
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
ms = int((seconds - int(seconds)) * 1000)
return f"{h:02}:{m:02}:{s:02},{ms:03}"
def extract_audio(video_path: str, output_dir: str) -> str:
"""Extract audio from *video_path* and return the audio file path."""
if not os.path.exists(video_path):
raise FileNotFoundError(video_path)
os.makedirs(output_dir, exist_ok=True)
base_name = os.path.splitext(os.path.basename(video_path))[0]
audio_path = os.path.join(output_dir, f"{base_name}.wav")
# Import here so tests that do not require MoviePy can run without the
# dependency installed.
from moviepy.editor import VideoFileClip
clip = VideoFileClip(video_path)
clip.audio.write_audiofile(audio_path, logger=None)
clip.close()
return audio_path
@dataclass
class SubtitleLine:
start: float
end: float
text: str
def _segments_to_srt(segments: List[SubtitleLine]) -> str:
lines = []
for idx, seg in enumerate(segments, 1):
lines.append(str(idx))
lines.append(f"{format_timestamp(seg.start)} --> {format_timestamp(seg.end)}")
lines.append(seg.text.strip())
lines.append("")
return "\n".join(lines)
def _export_and_transcribe_segment(seg, idx, audio_path, openai, words_per_sub, time_offset):
"""Esporta un segmento in MP3, verifica la dimensione e lo suddivide ricorsivamente se necessario."""
import tempfile
segment_list = []
txt_list = []
with tempfile.NamedTemporaryFile(suffix=f"_part{idx}.mp3", delete=False) as temp_file:
seg.export(temp_file.name, format="mp3")
temp_size = os.path.getsize(temp_file.name)
logging.debug(f"Segmento {idx}: dimensione {temp_size} byte (MP3)")
if temp_size > MAX_OPENAI_AUDIO_SIZE:
# Suddividi ulteriormente il segmento
logging.info(f"Segmento {idx} ancora troppo grande, suddivisione ricorsiva...")
duration_ms = len(seg)
mid = duration_ms // 2
seg1 = seg[:mid]
seg2 = seg[mid:]
# Ricorsione su ciascuna metà
segs1, txts1 = _export_and_transcribe_segment(seg1, f"{idx}a", audio_path, openai, words_per_sub, time_offset)
segs2, txts2 = _export_and_transcribe_segment(seg2, f"{idx}b", audio_path, openai, words_per_sub, time_offset + seg1.duration_seconds)
segment_list.extend(segs1)
segment_list.extend(segs2)
txt_list.extend(txts1)
txt_list.extend(txts2)
else:
with open(temp_file.name, "rb") as audio_file:
result = openai.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="json",
)
words = result.text.split()
plain = result.text.strip()
txt_list.append(plain)
# Ricostruisci segmenti SRT con offset temporale
segs = []
start = time_offset
step = 3.0
for i in range(0, len(words), words_per_sub):
end = start + step
text = " ".join(words[i : i + words_per_sub])
segs.append(SubtitleLine(start=start, end=end, text=text))
start = end
segment_list.extend(segs)
os.remove(temp_file.name)
return segment_list, txt_list
def transcribe_audio(
audio_path: str,
library: str = "faster_whisper",
api_key: Optional[str] = None,
model_size: str = "base",
words_per_sub: int = 7,
) -> tuple[str, str]:
"""Transcribe *audio_path* and return (SRT content, plain text content)."""
logging.debug(f"Starting transcription with library: {library}, audio_path: {audio_path}")
plain_text = None
if library == "OpenAI Whisper":
if api_key is None:
raise ValueError("api_key is required for OpenAI Whisper")
import openai
openai.api_key = api_key
# --- Gestione file troppo grandi ---
if os.path.getsize(audio_path) > MAX_OPENAI_AUDIO_SIZE:
logging.info("Audio troppo grande, suddivisione in segmenti...")
audio = AudioSegment.from_file(audio_path)
duration_ms = len(audio)
segment_length_ms = 20 * 60 * 1000
segments = [audio[i : i + segment_length_ms] for i in range(0, duration_ms, segment_length_ms)]
srt_parts = []
txt_parts = []
time_offset = 0.0
for idx, seg in enumerate(segments):
segs, txts = _export_and_transcribe_segment(seg, idx, audio_path, openai, words_per_sub, time_offset)
srt_parts.extend(segs)
txt_parts.extend(txts)
time_offset += seg.duration_seconds
segments = srt_parts
plain_text = " ".join(txt_parts)
else:
with open(audio_path, "rb") as audio_file:
result = openai.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="json",
)
logging.debug(f"OpenAI API response: {result}")
words = result.text.split()
plain_text = result.text.strip()
if not words:
logging.error("No text returned by OpenAI Whisper API.")
raise ValueError("No text returned by OpenAI Whisper API.")
segments = []
start = 0.0
step = 3.0
for i in range(0, len(words), words_per_sub):
end = start + step
text = " ".join(words[i : i + words_per_sub])
segments.append(SubtitleLine(start=start, end=end, text=text))
start = end
logging.debug(f"Generated segments: {segments}")
else:
if WhisperModel is None:
raise RuntimeError("faster_whisper is not installed")
logging.debug("Using Faster Whisper for transcription...")
model = WhisperModel(model_size)
segs = model.transcribe(audio_path)[0]
segments = [SubtitleLine(start=s.start, end=s.end, text=s.text) for s in segs]
plain_text = " ".join([s.text.strip() for s in segments])
logging.debug(f"Generated segments: {segments}")
if not segments:
logging.error("No segments generated during transcription.")
raise ValueError("No segments generated during transcription.")
srt_content = _segments_to_srt(segments)
logging.debug(f"Generated SRT content: {srt_content}")
return srt_content, plain_text
def save_srt(content: str, output_path: str) -> str:
with open(output_path, "w", encoding="utf-8") as f:
f.write(content)
return output_path
def save_txt(content: str, output_path: str) -> str:
with open(output_path, "w", encoding="utf-8") as f:
f.write(content)
return output_path
def merge_subtitles(video_path: str, srt_path: str, output_path: str) -> str:
command = [
"ffmpeg",
"-y",
"-i",
video_path,
"-vf",
f"subtitles={srt_path}",
"-c:a",
"copy",
"-c:v",
"libx264",
output_path,
]
subprocess.run(command, check=True)
return output_path