|
|
"""Utility functions for extracting audio, transcribing and merging subtitles.""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import logging |
|
|
import os |
|
|
import subprocess |
|
|
from dataclasses import dataclass |
|
|
from typing import List, Optional |
|
|
|
|
|
from pydub import AudioSegment |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
from faster_whisper import WhisperModel |
|
|
except ImportError: |
|
|
WhisperModel = None |
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") |
|
|
|
|
|
MAX_OPENAI_AUDIO_SIZE = 25 * 1024 * 1024 |
|
|
|
|
|
|
|
|
def format_timestamp(seconds: float) -> str: |
|
|
"""Return timestamp in SRT format.""" |
|
|
h = int(seconds // 3600) |
|
|
m = int((seconds % 3600) // 60) |
|
|
s = int(seconds % 60) |
|
|
ms = int((seconds - int(seconds)) * 1000) |
|
|
return f"{h:02}:{m:02}:{s:02},{ms:03}" |
|
|
|
|
|
|
|
|
def extract_audio(video_path: str, output_dir: str) -> str: |
|
|
"""Extract audio from *video_path* and return the audio file path.""" |
|
|
if not os.path.exists(video_path): |
|
|
raise FileNotFoundError(video_path) |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
base_name = os.path.splitext(os.path.basename(video_path))[0] |
|
|
audio_path = os.path.join(output_dir, f"{base_name}.wav") |
|
|
|
|
|
|
|
|
from moviepy.editor import VideoFileClip |
|
|
|
|
|
clip = VideoFileClip(video_path) |
|
|
clip.audio.write_audiofile(audio_path, logger=None) |
|
|
clip.close() |
|
|
return audio_path |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class SubtitleLine: |
|
|
start: float |
|
|
end: float |
|
|
text: str |
|
|
|
|
|
|
|
|
def _segments_to_srt(segments: List[SubtitleLine]) -> str: |
|
|
lines = [] |
|
|
for idx, seg in enumerate(segments, 1): |
|
|
lines.append(str(idx)) |
|
|
lines.append(f"{format_timestamp(seg.start)} --> {format_timestamp(seg.end)}") |
|
|
lines.append(seg.text.strip()) |
|
|
lines.append("") |
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
def _export_and_transcribe_segment(seg, idx, audio_path, openai, words_per_sub, time_offset): |
|
|
"""Esporta un segmento in MP3, verifica la dimensione e lo suddivide ricorsivamente se necessario.""" |
|
|
import tempfile |
|
|
segment_list = [] |
|
|
txt_list = [] |
|
|
with tempfile.NamedTemporaryFile(suffix=f"_part{idx}.mp3", delete=False) as temp_file: |
|
|
seg.export(temp_file.name, format="mp3") |
|
|
temp_size = os.path.getsize(temp_file.name) |
|
|
logging.debug(f"Segmento {idx}: dimensione {temp_size} byte (MP3)") |
|
|
if temp_size > MAX_OPENAI_AUDIO_SIZE: |
|
|
|
|
|
logging.info(f"Segmento {idx} ancora troppo grande, suddivisione ricorsiva...") |
|
|
duration_ms = len(seg) |
|
|
mid = duration_ms // 2 |
|
|
seg1 = seg[:mid] |
|
|
seg2 = seg[mid:] |
|
|
|
|
|
segs1, txts1 = _export_and_transcribe_segment(seg1, f"{idx}a", audio_path, openai, words_per_sub, time_offset) |
|
|
segs2, txts2 = _export_and_transcribe_segment(seg2, f"{idx}b", audio_path, openai, words_per_sub, time_offset + seg1.duration_seconds) |
|
|
segment_list.extend(segs1) |
|
|
segment_list.extend(segs2) |
|
|
txt_list.extend(txts1) |
|
|
txt_list.extend(txts2) |
|
|
else: |
|
|
with open(temp_file.name, "rb") as audio_file: |
|
|
result = openai.audio.transcriptions.create( |
|
|
model="whisper-1", |
|
|
file=audio_file, |
|
|
response_format="json", |
|
|
) |
|
|
words = result.text.split() |
|
|
plain = result.text.strip() |
|
|
txt_list.append(plain) |
|
|
|
|
|
segs = [] |
|
|
start = time_offset |
|
|
step = 3.0 |
|
|
for i in range(0, len(words), words_per_sub): |
|
|
end = start + step |
|
|
text = " ".join(words[i : i + words_per_sub]) |
|
|
segs.append(SubtitleLine(start=start, end=end, text=text)) |
|
|
start = end |
|
|
segment_list.extend(segs) |
|
|
os.remove(temp_file.name) |
|
|
return segment_list, txt_list |
|
|
|
|
|
|
|
|
def transcribe_audio( |
|
|
audio_path: str, |
|
|
library: str = "faster_whisper", |
|
|
api_key: Optional[str] = None, |
|
|
model_size: str = "base", |
|
|
words_per_sub: int = 7, |
|
|
) -> tuple[str, str]: |
|
|
"""Transcribe *audio_path* and return (SRT content, plain text content).""" |
|
|
logging.debug(f"Starting transcription with library: {library}, audio_path: {audio_path}") |
|
|
|
|
|
plain_text = None |
|
|
if library == "OpenAI Whisper": |
|
|
if api_key is None: |
|
|
raise ValueError("api_key is required for OpenAI Whisper") |
|
|
import openai |
|
|
|
|
|
openai.api_key = api_key |
|
|
|
|
|
if os.path.getsize(audio_path) > MAX_OPENAI_AUDIO_SIZE: |
|
|
logging.info("Audio troppo grande, suddivisione in segmenti...") |
|
|
audio = AudioSegment.from_file(audio_path) |
|
|
duration_ms = len(audio) |
|
|
segment_length_ms = 20 * 60 * 1000 |
|
|
segments = [audio[i : i + segment_length_ms] for i in range(0, duration_ms, segment_length_ms)] |
|
|
srt_parts = [] |
|
|
txt_parts = [] |
|
|
time_offset = 0.0 |
|
|
for idx, seg in enumerate(segments): |
|
|
segs, txts = _export_and_transcribe_segment(seg, idx, audio_path, openai, words_per_sub, time_offset) |
|
|
srt_parts.extend(segs) |
|
|
txt_parts.extend(txts) |
|
|
time_offset += seg.duration_seconds |
|
|
segments = srt_parts |
|
|
plain_text = " ".join(txt_parts) |
|
|
else: |
|
|
with open(audio_path, "rb") as audio_file: |
|
|
result = openai.audio.transcriptions.create( |
|
|
model="whisper-1", |
|
|
file=audio_file, |
|
|
response_format="json", |
|
|
) |
|
|
logging.debug(f"OpenAI API response: {result}") |
|
|
words = result.text.split() |
|
|
plain_text = result.text.strip() |
|
|
if not words: |
|
|
logging.error("No text returned by OpenAI Whisper API.") |
|
|
raise ValueError("No text returned by OpenAI Whisper API.") |
|
|
segments = [] |
|
|
start = 0.0 |
|
|
step = 3.0 |
|
|
for i in range(0, len(words), words_per_sub): |
|
|
end = start + step |
|
|
text = " ".join(words[i : i + words_per_sub]) |
|
|
segments.append(SubtitleLine(start=start, end=end, text=text)) |
|
|
start = end |
|
|
logging.debug(f"Generated segments: {segments}") |
|
|
else: |
|
|
if WhisperModel is None: |
|
|
raise RuntimeError("faster_whisper is not installed") |
|
|
logging.debug("Using Faster Whisper for transcription...") |
|
|
model = WhisperModel(model_size) |
|
|
segs = model.transcribe(audio_path)[0] |
|
|
segments = [SubtitleLine(start=s.start, end=s.end, text=s.text) for s in segs] |
|
|
plain_text = " ".join([s.text.strip() for s in segments]) |
|
|
logging.debug(f"Generated segments: {segments}") |
|
|
|
|
|
if not segments: |
|
|
logging.error("No segments generated during transcription.") |
|
|
raise ValueError("No segments generated during transcription.") |
|
|
|
|
|
srt_content = _segments_to_srt(segments) |
|
|
logging.debug(f"Generated SRT content: {srt_content}") |
|
|
return srt_content, plain_text |
|
|
|
|
|
|
|
|
def save_srt(content: str, output_path: str) -> str: |
|
|
with open(output_path, "w", encoding="utf-8") as f: |
|
|
f.write(content) |
|
|
return output_path |
|
|
|
|
|
|
|
|
def save_txt(content: str, output_path: str) -> str: |
|
|
with open(output_path, "w", encoding="utf-8") as f: |
|
|
f.write(content) |
|
|
return output_path |
|
|
|
|
|
|
|
|
def merge_subtitles(video_path: str, srt_path: str, output_path: str) -> str: |
|
|
command = [ |
|
|
"ffmpeg", |
|
|
"-y", |
|
|
"-i", |
|
|
video_path, |
|
|
"-vf", |
|
|
f"subtitles={srt_path}", |
|
|
"-c:a", |
|
|
"copy", |
|
|
"-c:v", |
|
|
"libx264", |
|
|
output_path, |
|
|
] |
|
|
subprocess.run(command, check=True) |
|
|
return output_path |
|
|
|
|
|
|