#!/usr/bin/env python3 """ interview_cuts.py — Gera cortes para entrevistas em PT (pergunta curta + resposta longa). Uso típico: python interview_cuts.py video.mp4 --min 60 --max 150 --qmax 12 --gap 2.0 --lead-in-question yes --max-cuts 20 --preview Pré-requisitos: - Ter o arquivo _transcript.json na mesma pasta do vídeo (gerado pelo video_cuts_offline_mac_plus_subs.py). Saídas: - _interview_cuts.json - _interview_cuts.sh - PREVIEW__interview.mp4 (opcional) """ import argparse, json, os, re, shlex, subprocess, math from pathlib import Path from typing import List, Dict, Any try: import numpy as np except Exception: np = None try: from resemblyzer import VoiceEncoder, preprocess_wav _HAVE_RESEMBLYZER = True except Exception: VoiceEncoder = None preprocess_wav = None _HAVE_RESEMBLYZER = False def load_json(p: Path): with p.open("r", encoding="utf-8") as f: return json.load(f) def save_json(obj, p: Path): with p.open("w", encoding="utf-8") as f: json.dump(obj, f, ensure_ascii=False, indent=2) def normspace(s: str) -> str: return re.sub(r"\s+", " ", (s or "").strip()) def first_sentence(s: str, limit=120) -> str: s = normspace(s) parts = re.split(r"(?<=[.!?])\s+", s) out = parts[0] if parts and parts[0] else s return out[:limit].rstrip() def ensure_wav_16k_mono(video_path: Path) -> Path: """Export a temporary 16k mono wav next to the video if not present.""" wav_path = video_path.with_suffix(".16k.wav") if wav_path.exists(): return wav_path cmd = [ "ffmpeg", "-y", "-i", str(video_path), "-ac", "1", "-ar", "16000", str(wav_path) ] subprocess.run(cmd, check=True) return wav_path def diarize_with_resemblyzer(wav_path: Path, n_speakers: int = 2, debug: bool = False): """Lightweight diarization using Resemblyzer.""" if not _HAVE_RESEMBLYZER or np is None: raise RuntimeError("pip install resemblyzer numpy scikit-learn soundfile") try: from sklearn.cluster import AgglomerativeClustering except Exception: raise RuntimeError("pip install scikit-learn") wav = preprocess_wav(str(wav_path)) enc = VoiceEncoder() _, partial_embeds, partial_slices = enc.embed_utterance(wav, return_partials=True) sr = 16000.0 duration = float(len(wav)) / sr if len(wav) > 0 else 0.0 if len(partial_embeds) == 0 or duration <= 0.0: return [] half = 0.8 n_parts = len(partial_embeds) partial_times = np.array([duration/2.0], dtype=float) if duration <= 2*half else np.linspace(half, duration - half, n_parts) n_samples = len(partial_embeds) if n_samples < 2: return [] X = np.vstack(partial_embeds) n_speakers = max(2, int(n_speakers)) n_clusters = max(2, min(n_speakers, X.shape[0])) labels = AgglomerativeClustering(n_clusters=n_clusters).fit_predict(X) segs = [] cur_spk = int(labels[0]) cur_start = max(0.0, float(partial_times[0] - half)) cur_end = float(partial_times[0] + half) for i in range(1, len(labels)): spk = int(labels[i]) st = float(partial_times[i] - half) en = float(partial_times[i] + half) if spk == cur_spk and st <= cur_end + 0.1: cur_end = max(cur_end, en) else: segs.append({"start": round(max(0.0, cur_start), 3), "end": round(max(cur_end, cur_start+0.1), 3), "spk": cur_spk}) cur_spk = spk cur_start = st cur_end = en segs.append({"start": round(max(0.0, cur_start), 3), "end": round(max(cur_end, cur_start+0.1), 3), "spk": cur_spk}) return segs def assign_speakers_to_transcript(transcript: List[Dict[str, Any]], diar_segs: List[Dict[str, Any]]): def spk_at(t: float): for s in diar_segs: if s["start"] - 0.1 <= t <= s["end"] + 0.1: return s["spk"] if diar_segs: bydist = min(diar_segs, key=lambda s: abs((s["start"] + s["end"]) / 2 - t)) return bydist["spk"] return -1 return [spk_at((float(seg.get("start",0)) + float(seg.get("end",0))) / 2.0) for seg in transcript] def detect_questions(transcript: List[Dict[str, Any]], qmax: float, wc_max: int, qmark_required: bool, debug: bool=False) -> List[int]: idxs = [] for i, seg in enumerate(transcript): st = float(seg.get("start", 0)); en = float(seg.get("end", 0)); d = max(0.0, en - st) text = (seg.get("text") or "").strip() wc = len(text.split()) has_qmark = text.endswith("?") dur_ok = d <= qmax wc_ok = wc <= wc_max and wc >= 2 is_q = (has_qmark or dur_ok) and wc_ok if qmark_required: is_q = has_qmark and wc_ok if is_q: idxs.append(i) return idxs def build_interview_cuts( transcript: List[Dict[str, Any]], min_len: float, max_len: float, qmax: float, gap: float, lead_in_question: bool, max_cuts: int, wc_max: int = 35, qmark_required: bool = False, spk_labels: List[int] | None = None, interviewer_id: int | None = None, debug: bool = False, ) -> List[Dict[str, Any]]: if spk_labels is not None and interviewer_id is not None: qs = set() for i, seg in enumerate(transcript): st = float(seg.get("start", 0)); en = float(seg.get("end", 0)); d = en - st text = (seg.get("text") or "").strip() wc = len(text.split()) has_q = text.endswith("?") if spk_labels[i] == interviewer_id and wc <= wc_max and (d <= qmax or has_q): qs.add(i) else: qs = set(detect_questions(transcript, qmax, wc_max, qmark_required, debug)) cuts = [] n = len(transcript) i = 0 while i < n: seg = transcript[i] st = float(seg.get("start", 0)); en = float(seg.get("end", 0)); d = en - st txt = normspace(seg.get("text", "")) if not txt or d < 0.2: i += 1; continue if i in qs: j = i + 1 resp_start = None end_time = en collected_text = [] segments = [] while j < n: s2 = transcript[j] st2 = float(s2.get("start", 0)); en2 = float(s2.get("end", 0)); d2 = en2 - st2 txt2 = normspace(s2.get("text", "")) if j in qs: break if d2 < 0.25: j += 1 continue if resp_start is not None and st2 - end_time > gap: break if txt2: if resp_start is None: resp_start = st2 segments.append({"start": st2, "end": en2}) collected_text.append(txt2) end_time = en2 if end_time - (resp_start if resp_start is not None else st) >= max_len: break j += 1 if resp_start is not None: start_cut = st if lead_in_question else resp_start end_cut = end_time dur = end_cut - start_cut if dur >= min_len * 0.6: label = first_sentence(" ".join(collected_text), 70) or "Resposta marcante" hook = first_sentence(txt, 90) if lead_in_question else "" cuts.append({ "start": round(start_cut, 3), "end": round(end_cut, 3), "label": label, "hook": hook, "reason": "Pergunta curta seguida de resposta longa", "segments": ([{"start": st, "end": en}] if lead_in_question else []) + segments }) if len(cuts) >= max_cuts: break i = max(i + 1, j) continue else: j = i + 1 end_time = en collected = [txt] if txt else [] segments = [{"start": st, "end": en}] while j < n and float(transcript[j].get("start",0)) - end_time <= gap: s2 = transcript[j] st2 = float(s2.get("start", 0)); en2 = float(s2.get("end", 0)) t2 = normspace(s2.get("text", "")) if en2 - st2 < 0.25: j += 1 continue if t2: segments.append({"start": st2, "end": en2}) collected.append(t2) end_time = en2 if end_time - st >= max_len: break j += 1 dur = end_time - st if dur >= min_len and collected: cuts.append({ "start": round(st, 3), "end": round(end_time, 3), "label": first_sentence(" ".join(collected), 70) or "Resposta destacada", "hook": "", "reason": "Resposta contínua em entrevista", "segments": segments }) if len(cuts) >= max_cuts: break i = j continue return cuts def write_shell_and_preview(video_path: Path, base: str, cuts: List[Dict[str, Any]], preview: bool): out_dir = video_path.parent sh_path = out_dir / f"{base}_interview_cuts.sh" parts_dir = out_dir / "export_parts" parts_dir.mkdir(exist_ok=True) lines = ["#!/usr/bin/env bash", "set -e"] for k, c in enumerate(cuts, 1): ss = c["start"]; ee = c["end"]; dd = round(ee - ss, 3) out_file = parts_dir / f"{base}_cut_{k:02}.mp4" cmd = ( f"ffmpeg -hide_banner -loglevel warning -y -ss {ss} -i {shlex.quote(str(video_path))} -t {dd} " f"-c:v libx264 -crf 22 -preset veryfast -vf scale=1080:-2:flags=bicubic -c:a aac -b:a 128k {shlex.quote(str(out_file))}" ) lines.append(cmd) if preview and cuts: plist = out_dir / f"{base}_interview_preview_list.txt" with plist.open("w", encoding="utf-8") as f: for k in range(1, len(cuts)+1): p = parts_dir / f"{base}_cut_{k:02}.mp4" f.write(f"file {p.name}\n") preview_path = out_dir / f"PREVIEW_{base}_interview.mp4" lines.append(f"ffmpeg -hide_banner -loglevel warning -y -f concat -safe 0 -i {shlex.quote(str(plist))} -c copy {shlex.quote(str(preview_path))}") sh_path.write_text("\n".join(lines) + "\n", encoding="utf-8") os.chmod(sh_path, 0o755) print(f"✅ Script de export: {sh_path}") def main(): ap = argparse.ArgumentParser("Cortes para entrevistas (pergunta curta + resposta longa)") ap.add_argument("video", help="Arquivo de entrada (.mp4/.mov)") ap.add_argument("--min", type=float, default=60.0, help="Duração mínima do corte em segundos") ap.add_argument("--max", type=float, default=150.0, help="Duração máxima do corte em segundos") ap.add_argument("--qmax", type=float, default=12.0, help="Máximo de duração para marcar perguntas") ap.add_argument("--gap", type=float, default=2.0, help="Tolerância de gap entre segmentos") ap.add_argument("--lead-in-question", choices=["yes","no"], default="yes", help="Incluir pergunta antes da resposta") ap.add_argument("--max-cuts", type=int, default=20, help="Limite de cortes") ap.add_argument("--preview", action="store_true", help="Gera comando de prévia por concat") ap.add_argument("--q-wc-max", type=int, default=35, help="Máximo de palavras para considerar pergunta") ap.add_argument("--qmark-required", action="store_true", help="Exigir '?' para marcar pergunta") ap.add_argument("--diarize", action="store_true", help="Ativar diarização com Resemblyzer") ap.add_argument("--n-speakers", type=int, default=2, help="Número de falantes para clusterizar") ap.add_argument("--debug", action="store_true", help="Imprimir diagnóstico") args = ap.parse_args() video_path = Path(args.video).expanduser().resolve() base = video_path.stem transcript_path = video_path.with_name(f"{base}_transcript.json") if not transcript_path.exists(): print(f"ERRO: não achei '{transcript_path.name}'. Gere a transcrição primeiro com video_cuts_offline_mac_plus_subs.py") raise SystemExit(1) transcript = load_json(transcript_path) spk_labels = None interviewer_id = None if args.diarize: try: wav16k = ensure_wav_16k_mono(video_path) diar = diarize_with_resemblyzer(wav16k, n_speakers=args.n_speakers, debug=args.debug) if diar: spk_labels = assign_speakers_to_transcript(transcript, diar) totals = {} for i, seg in enumerate(transcript): st = float(seg.get("start",0)); en = float(seg.get("end",0)); d = max(0.0, en-st) spk = spk_labels[i] if spk_labels and i < len(spk_labels) else -1 totals[spk] = totals.get(spk, 0.0) + d if totals: interviewer_id = sorted(totals.items(), key=lambda kv: kv[1])[0][0] except Exception as e: print(f"[warn] Diarização falhou: {e}. Seguindo sem diarização.") cuts = build_interview_cuts( transcript=transcript, min_len=args.min, max_len=args.max, qmax=args.qmax, gap=args.gap, lead_in_question=(args.lead_in_question=="yes"), max_cuts=args.max_cuts, wc_max=args.q_wc_max, qmark_required=args.qmark_required, spk_labels=spk_labels, interviewer_id=interviewer_id, debug=args.debug, ) out_json = video_path.with_name(f"{base}_interview_cuts.json") save_json(cuts, out_json) print(f"✅ Gerado: {out_json}") write_shell_and_preview(video_path, base, cuts, preview=args.preview) if __name__ == "__main__": main()