File size: 8,270 Bytes
0d6f640 3c11817 0d6f640 3c11817 0d6f640 3c11817 0d6f640 3c11817 0d6f640 3c11817 0d6f640 3c11817 0d6f640 3c11817 0d6f640 3c11817 0d6f640 3c11817 0d6f640 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
"""Utility functions for extracting audio, transcribing and merging subtitles."""
from __future__ import annotations
import logging
import os
import subprocess
from dataclasses import dataclass
from typing import List, Optional
from pydub import AudioSegment
# MoviePy is an optional dependency used when extracting audio. It is imported
# lazily to avoid issues when running in environments where it is not
# available (for instance during unit tests).
try:
from faster_whisper import WhisperModel
except ImportError: # pragma: no cover - optional dependency
WhisperModel = None
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
MAX_OPENAI_AUDIO_SIZE = 25 * 1024 * 1024 # 25 MB
def format_timestamp(seconds: float) -> str:
"""Return timestamp in SRT format."""
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
ms = int((seconds - int(seconds)) * 1000)
return f"{h:02}:{m:02}:{s:02},{ms:03}"
def extract_audio(video_path: str, output_dir: str) -> str:
"""Extract audio from *video_path* and return the audio file path."""
if not os.path.exists(video_path):
raise FileNotFoundError(video_path)
os.makedirs(output_dir, exist_ok=True)
base_name = os.path.splitext(os.path.basename(video_path))[0]
audio_path = os.path.join(output_dir, f"{base_name}.wav")
# Import here so tests that do not require MoviePy can run without the
# dependency installed.
from moviepy.editor import VideoFileClip
clip = VideoFileClip(video_path)
clip.audio.write_audiofile(audio_path, logger=None)
clip.close()
return audio_path
@dataclass
class SubtitleLine:
start: float
end: float
text: str
def _segments_to_srt(segments: List[SubtitleLine]) -> str:
lines = []
for idx, seg in enumerate(segments, 1):
lines.append(str(idx))
lines.append(f"{format_timestamp(seg.start)} --> {format_timestamp(seg.end)}")
lines.append(seg.text.strip())
lines.append("")
return "\n".join(lines)
def _export_and_transcribe_segment(seg, idx, audio_path, openai, words_per_sub, time_offset):
"""Esporta un segmento in MP3, verifica la dimensione e lo suddivide ricorsivamente se necessario."""
import tempfile
segment_list = []
txt_list = []
with tempfile.NamedTemporaryFile(suffix=f"_part{idx}.mp3", delete=False) as temp_file:
seg.export(temp_file.name, format="mp3")
temp_size = os.path.getsize(temp_file.name)
logging.debug(f"Segmento {idx}: dimensione {temp_size} byte (MP3)")
if temp_size > MAX_OPENAI_AUDIO_SIZE:
# Suddividi ulteriormente il segmento
logging.info(f"Segmento {idx} ancora troppo grande, suddivisione ricorsiva...")
duration_ms = len(seg)
mid = duration_ms // 2
seg1 = seg[:mid]
seg2 = seg[mid:]
# Ricorsione su ciascuna metà
segs1, txts1 = _export_and_transcribe_segment(seg1, f"{idx}a", audio_path, openai, words_per_sub, time_offset)
segs2, txts2 = _export_and_transcribe_segment(seg2, f"{idx}b", audio_path, openai, words_per_sub, time_offset + seg1.duration_seconds)
segment_list.extend(segs1)
segment_list.extend(segs2)
txt_list.extend(txts1)
txt_list.extend(txts2)
else:
with open(temp_file.name, "rb") as audio_file:
result = openai.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="json",
)
words = result.text.split()
plain = result.text.strip()
txt_list.append(plain)
# Ricostruisci segmenti SRT con offset temporale
segs = []
start = time_offset
step = 3.0
for i in range(0, len(words), words_per_sub):
end = start + step
text = " ".join(words[i : i + words_per_sub])
segs.append(SubtitleLine(start=start, end=end, text=text))
start = end
segment_list.extend(segs)
os.remove(temp_file.name)
return segment_list, txt_list
def transcribe_audio(
audio_path: str,
library: str = "faster_whisper",
api_key: Optional[str] = None,
model_size: str = "base",
words_per_sub: int = 7,
) -> tuple[str, str]:
"""Transcribe *audio_path* and return (SRT content, plain text content)."""
logging.debug(f"Starting transcription with library: {library}, audio_path: {audio_path}")
plain_text = None
if library == "OpenAI Whisper":
if api_key is None:
raise ValueError("api_key is required for OpenAI Whisper")
import openai
openai.api_key = api_key
# --- Gestione file troppo grandi ---
if os.path.getsize(audio_path) > MAX_OPENAI_AUDIO_SIZE:
logging.info("Audio troppo grande, suddivisione in segmenti...")
audio = AudioSegment.from_file(audio_path)
duration_ms = len(audio)
segment_length_ms = 20 * 60 * 1000
segments = [audio[i : i + segment_length_ms] for i in range(0, duration_ms, segment_length_ms)]
srt_parts = []
txt_parts = []
time_offset = 0.0
for idx, seg in enumerate(segments):
segs, txts = _export_and_transcribe_segment(seg, idx, audio_path, openai, words_per_sub, time_offset)
srt_parts.extend(segs)
txt_parts.extend(txts)
time_offset += seg.duration_seconds
segments = srt_parts
plain_text = " ".join(txt_parts)
else:
with open(audio_path, "rb") as audio_file:
result = openai.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="json",
)
logging.debug(f"OpenAI API response: {result}")
words = result.text.split()
plain_text = result.text.strip()
if not words:
logging.error("No text returned by OpenAI Whisper API.")
raise ValueError("No text returned by OpenAI Whisper API.")
segments = []
start = 0.0
step = 3.0
for i in range(0, len(words), words_per_sub):
end = start + step
text = " ".join(words[i : i + words_per_sub])
segments.append(SubtitleLine(start=start, end=end, text=text))
start = end
logging.debug(f"Generated segments: {segments}")
else:
if WhisperModel is None:
raise RuntimeError("faster_whisper is not installed")
logging.debug("Using Faster Whisper for transcription...")
model = WhisperModel(model_size)
segs = model.transcribe(audio_path)[0]
segments = [SubtitleLine(start=s.start, end=s.end, text=s.text) for s in segs]
plain_text = " ".join([s.text.strip() for s in segments])
logging.debug(f"Generated segments: {segments}")
if not segments:
logging.error("No segments generated during transcription.")
raise ValueError("No segments generated during transcription.")
srt_content = _segments_to_srt(segments)
logging.debug(f"Generated SRT content: {srt_content}")
return srt_content, plain_text
def save_srt(content: str, output_path: str) -> str:
with open(output_path, "w", encoding="utf-8") as f:
f.write(content)
return output_path
def save_txt(content: str, output_path: str) -> str:
with open(output_path, "w", encoding="utf-8") as f:
f.write(content)
return output_path
def merge_subtitles(video_path: str, srt_path: str, output_path: str) -> str:
command = [
"ffmpeg",
"-y",
"-i",
video_path,
"-vf",
f"subtitles={srt_path}",
"-c:a",
"copy",
"-c:v",
"libx264",
output_path,
]
subprocess.run(command, check=True)
return output_path
|