"""Utility functions for extracting audio, transcribing and merging subtitles.""" from __future__ import annotations import logging import os import subprocess from dataclasses import dataclass from typing import List, Optional from pydub import AudioSegment # MoviePy is an optional dependency used when extracting audio. It is imported # lazily to avoid issues when running in environments where it is not # available (for instance during unit tests). try: from faster_whisper import WhisperModel except ImportError: # pragma: no cover - optional dependency WhisperModel = None logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") MAX_OPENAI_AUDIO_SIZE = 25 * 1024 * 1024 # 25 MB def format_timestamp(seconds: float) -> str: """Return timestamp in SRT format.""" h = int(seconds // 3600) m = int((seconds % 3600) // 60) s = int(seconds % 60) ms = int((seconds - int(seconds)) * 1000) return f"{h:02}:{m:02}:{s:02},{ms:03}" def extract_audio(video_path: str, output_dir: str) -> str: """Extract audio from *video_path* and return the audio file path.""" if not os.path.exists(video_path): raise FileNotFoundError(video_path) os.makedirs(output_dir, exist_ok=True) base_name = os.path.splitext(os.path.basename(video_path))[0] audio_path = os.path.join(output_dir, f"{base_name}.wav") # Import here so tests that do not require MoviePy can run without the # dependency installed. from moviepy.editor import VideoFileClip clip = VideoFileClip(video_path) clip.audio.write_audiofile(audio_path, logger=None) clip.close() return audio_path @dataclass class SubtitleLine: start: float end: float text: str def _segments_to_srt(segments: List[SubtitleLine]) -> str: lines = [] for idx, seg in enumerate(segments, 1): lines.append(str(idx)) lines.append(f"{format_timestamp(seg.start)} --> {format_timestamp(seg.end)}") lines.append(seg.text.strip()) lines.append("") return "\n".join(lines) def _export_and_transcribe_segment(seg, idx, audio_path, openai, words_per_sub, time_offset): """Esporta un segmento in MP3, verifica la dimensione e lo suddivide ricorsivamente se necessario.""" import tempfile segment_list = [] txt_list = [] with tempfile.NamedTemporaryFile(suffix=f"_part{idx}.mp3", delete=False) as temp_file: seg.export(temp_file.name, format="mp3") temp_size = os.path.getsize(temp_file.name) logging.debug(f"Segmento {idx}: dimensione {temp_size} byte (MP3)") if temp_size > MAX_OPENAI_AUDIO_SIZE: # Suddividi ulteriormente il segmento logging.info(f"Segmento {idx} ancora troppo grande, suddivisione ricorsiva...") duration_ms = len(seg) mid = duration_ms // 2 seg1 = seg[:mid] seg2 = seg[mid:] # Ricorsione su ciascuna metà segs1, txts1 = _export_and_transcribe_segment(seg1, f"{idx}a", audio_path, openai, words_per_sub, time_offset) segs2, txts2 = _export_and_transcribe_segment(seg2, f"{idx}b", audio_path, openai, words_per_sub, time_offset + seg1.duration_seconds) segment_list.extend(segs1) segment_list.extend(segs2) txt_list.extend(txts1) txt_list.extend(txts2) else: with open(temp_file.name, "rb") as audio_file: result = openai.audio.transcriptions.create( model="whisper-1", file=audio_file, response_format="json", ) words = result.text.split() plain = result.text.strip() txt_list.append(plain) # Ricostruisci segmenti SRT con offset temporale segs = [] start = time_offset step = 3.0 for i in range(0, len(words), words_per_sub): end = start + step text = " ".join(words[i : i + words_per_sub]) segs.append(SubtitleLine(start=start, end=end, text=text)) start = end segment_list.extend(segs) os.remove(temp_file.name) return segment_list, txt_list def transcribe_audio( audio_path: str, library: str = "faster_whisper", api_key: Optional[str] = None, model_size: str = "base", words_per_sub: int = 7, ) -> tuple[str, str]: """Transcribe *audio_path* and return (SRT content, plain text content).""" logging.debug(f"Starting transcription with library: {library}, audio_path: {audio_path}") plain_text = None if library == "OpenAI Whisper": if api_key is None: raise ValueError("api_key is required for OpenAI Whisper") import openai openai.api_key = api_key # --- Gestione file troppo grandi --- if os.path.getsize(audio_path) > MAX_OPENAI_AUDIO_SIZE: logging.info("Audio troppo grande, suddivisione in segmenti...") audio = AudioSegment.from_file(audio_path) duration_ms = len(audio) segment_length_ms = 20 * 60 * 1000 segments = [audio[i : i + segment_length_ms] for i in range(0, duration_ms, segment_length_ms)] srt_parts = [] txt_parts = [] time_offset = 0.0 for idx, seg in enumerate(segments): segs, txts = _export_and_transcribe_segment(seg, idx, audio_path, openai, words_per_sub, time_offset) srt_parts.extend(segs) txt_parts.extend(txts) time_offset += seg.duration_seconds segments = srt_parts plain_text = " ".join(txt_parts) else: with open(audio_path, "rb") as audio_file: result = openai.audio.transcriptions.create( model="whisper-1", file=audio_file, response_format="json", ) logging.debug(f"OpenAI API response: {result}") words = result.text.split() plain_text = result.text.strip() if not words: logging.error("No text returned by OpenAI Whisper API.") raise ValueError("No text returned by OpenAI Whisper API.") segments = [] start = 0.0 step = 3.0 for i in range(0, len(words), words_per_sub): end = start + step text = " ".join(words[i : i + words_per_sub]) segments.append(SubtitleLine(start=start, end=end, text=text)) start = end logging.debug(f"Generated segments: {segments}") else: if WhisperModel is None: raise RuntimeError("faster_whisper is not installed") logging.debug("Using Faster Whisper for transcription...") model = WhisperModel(model_size) segs = model.transcribe(audio_path)[0] segments = [SubtitleLine(start=s.start, end=s.end, text=s.text) for s in segs] plain_text = " ".join([s.text.strip() for s in segments]) logging.debug(f"Generated segments: {segments}") if not segments: logging.error("No segments generated during transcription.") raise ValueError("No segments generated during transcription.") srt_content = _segments_to_srt(segments) logging.debug(f"Generated SRT content: {srt_content}") return srt_content, plain_text def save_srt(content: str, output_path: str) -> str: with open(output_path, "w", encoding="utf-8") as f: f.write(content) return output_path def save_txt(content: str, output_path: str) -> str: with open(output_path, "w", encoding="utf-8") as f: f.write(content) return output_path def merge_subtitles(video_path: str, srt_path: str, output_path: str) -> str: command = [ "ffmpeg", "-y", "-i", video_path, "-vf", f"subtitles={srt_path}", "-c:a", "copy", "-c:v", "libx264", output_path, ] subprocess.run(command, check=True) return output_path