| from __future__ import annotations
|
|
|
| import os
|
| import shlex
|
| import subprocess
|
| from pathlib import Path
|
| from typing import Dict, Any, List, Tuple, Optional
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def extract_audio_ffmpeg(video_path: str, audio_out: Path, sr: int = 16000, mono: bool = True) -> str:
|
| audio_out.parent.mkdir(parents=True, exist_ok=True)
|
| cmd = f'ffmpeg -y -i "{video_path}" -vn {"-ac 1" if mono else ""} -ar {sr} -f wav "{audio_out}"'
|
| subprocess.run(shlex.split(cmd), check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
| return str(audio_out)
|
|
|
|
|
| def _get_video_duration_seconds(video_path: str) -> float:
|
| try:
|
|
|
| cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=duration -of default=nw=1 "{video_path}"'
|
| out = subprocess.check_output(shlex.split(cmd), stderr=subprocess.DEVNULL).decode("utf-8", errors="ignore")
|
| for line in out.splitlines():
|
| if line.startswith("duration="):
|
| try:
|
| return float(line.split("=", 1)[1])
|
| except Exception:
|
| pass
|
| except Exception:
|
| pass
|
| return 0.0
|
|
|
|
|
| def diarize_audio(wav_path: str, base_dir: Path, hf_token_env: str | None = None) -> Tuple[List[Dict[str, Any]], List[str]]:
|
| """Returns segments [{'start','end','speaker'}] and dummy clip_paths (not used in MVP)."""
|
| segments: List[Dict[str, Any]] = []
|
| clip_paths: List[str] = []
|
|
|
| token = os.getenv("PYANNOTE_TOKEN") or (os.getenv(hf_token_env) if hf_token_env else os.getenv("HF_TOKEN"))
|
| try:
|
| if token:
|
| from pyannote.audio import Pipeline
|
| pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=token)
|
| diarization = pipeline(wav_path)
|
|
|
|
|
| for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True)):
|
| segments.append({
|
| "start": float(getattr(turn, "start", 0.0) or 0.0),
|
| "end": float(getattr(turn, "end", 0.0) or 0.0),
|
| "speaker": str(speaker) if speaker is not None else f"SPEAKER_{i:02d}",
|
| })
|
| else:
|
|
|
|
|
|
|
| segments.append({"start": 0.0, "end": 0.0, "speaker": "SPEAKER_00"})
|
| except Exception:
|
|
|
| segments.append({"start": 0.0, "end": 0.0, "speaker": "SPEAKER_00"})
|
|
|
| segments = sorted(segments, key=lambda s: s.get("start", 0.0))
|
| return segments, clip_paths
|
|
|
|
|
| def _fmt_srt_time(seconds: float) -> str:
|
| h = int(seconds // 3600)
|
| m = int((seconds % 3600) // 60)
|
| s = int(seconds % 60)
|
| ms = int(round((seconds - int(seconds)) * 1000))
|
| return f"{h:02}:{m:02}:{s:02},{ms:03}"
|
|
|
|
|
| def _generate_srt(segments: List[Dict[str, Any]], texts: List[str]) -> str:
|
| n = min(len(segments), len(texts))
|
| lines: List[str] = []
|
| for i in range(n):
|
| seg = segments[i]
|
| text = (texts[i] or "").strip()
|
| start = float(seg.get("start", 0.0))
|
| end = float(seg.get("end", max(start + 2.0, start)))
|
| speaker = seg.get("speaker")
|
| if speaker:
|
| text = f"[{speaker}]: {text}" if text else f"[{speaker}]"
|
| lines.append(str(i + 1))
|
| lines.append(f"{_fmt_srt_time(start)} --> {_fmt_srt_time(end)}")
|
| lines.append(text)
|
| lines.append("")
|
| return "\n".join(lines).strip() + "\n"
|
|
|
|
|
| def asr_transcribe_wav_simple(wav_path: str) -> str:
|
| """Very robust ASR stub: try faster-whisper small if present; otherwise return empty text.
|
| Intended for MVP in Spaces without heavy GPU. """
|
| try:
|
| from faster_whisper import WhisperModel
|
| model = WhisperModel("Systran/faster-whisper-small", device="cpu")
|
|
|
| segments, info = model.transcribe(wav_path, vad_filter=True, without_timestamps=True, language=None)
|
| text = " ".join(seg.text.strip() for seg in segments if getattr(seg, "text", None))
|
| return text.strip()
|
| except Exception:
|
|
|
| return ""
|
|
|
|
|
| def generate(video_path: str, out_dir: Path) -> Dict[str, Any]:
|
| """End-to-end MVP that returns {'une_srt','free_text','artifacts':{...}}."""
|
| out_dir.mkdir(parents=True, exist_ok=True)
|
| wav_path = extract_audio_ffmpeg(video_path, out_dir / f"{Path(video_path).stem}.wav")
|
|
|
|
|
| segments, _ = diarize_audio(wav_path, out_dir, hf_token_env="HF_TOKEN")
|
|
|
|
|
| free_text = asr_transcribe_wav_simple(wav_path)
|
|
|
|
|
| if not segments:
|
| segments = [{"start": 0.0, "end": 0.0, "speaker": "SPEAKER_00"}]
|
| texts: List[str] = []
|
| if len(segments) <= 1:
|
| texts = [free_text]
|
| else:
|
|
|
| words = free_text.split()
|
| chunk = max(1, len(words) // len(segments))
|
| for i in range(len(segments)):
|
| start_idx = i * chunk
|
| end_idx = (i + 1) * chunk if i < len(segments) - 1 else len(words)
|
| texts.append(" ".join(words[start_idx:end_idx]))
|
|
|
| une_srt = _generate_srt(segments, texts)
|
|
|
| return {
|
| "une_srt": une_srt,
|
| "free_text": free_text,
|
| "artifacts": {
|
| "wav_path": str(wav_path),
|
| },
|
| }
|
|
|