#!/usr/bin/env python3
"""
interview_cuts.py — Gera cortes para entrevistas em PT (pergunta curta + resposta longa).
Uso típico:
python interview_cuts.py video.mp4 --min 60 --max 150 --qmax 12 --gap 2.0 --lead-in-question yes --max-cuts 20 --preview
Pré-requisitos:
- Ter o arquivo _transcript.json na mesma pasta do vídeo (gerado pelo video_cuts_offline_mac_plus_subs.py).
Saídas:
- _interview_cuts.json
- _interview_cuts.sh
- PREVIEW__interview.mp4 (opcional)
"""
import argparse, json, os, re, shlex, subprocess, math
from pathlib import Path
from typing import List, Dict, Any
try:
import numpy as np
except Exception:
np = None
try:
from resemblyzer import VoiceEncoder, preprocess_wav
_HAVE_RESEMBLYZER = True
except Exception:
VoiceEncoder = None
preprocess_wav = None
_HAVE_RESEMBLYZER = False
def load_json(p: Path):
with p.open("r", encoding="utf-8") as f:
return json.load(f)
def save_json(obj, p: Path):
with p.open("w", encoding="utf-8") as f:
json.dump(obj, f, ensure_ascii=False, indent=2)
def normspace(s: str) -> str:
return re.sub(r"\s+", " ", (s or "").strip())
def first_sentence(s: str, limit=120) -> str:
s = normspace(s)
parts = re.split(r"(?<=[.!?])\s+", s)
out = parts[0] if parts and parts[0] else s
return out[:limit].rstrip()
def ensure_wav_16k_mono(video_path: Path) -> Path:
"""Export a temporary 16k mono wav next to the video if not present."""
wav_path = video_path.with_suffix(".16k.wav")
if wav_path.exists():
return wav_path
cmd = [
"ffmpeg", "-y",
"-i", str(video_path),
"-ac", "1", "-ar", "16000",
str(wav_path)
]
subprocess.run(cmd, check=True)
return wav_path
def diarize_with_resemblyzer(wav_path: Path, n_speakers: int = 2, debug: bool = False):
"""Lightweight diarization using Resemblyzer."""
if not _HAVE_RESEMBLYZER or np is None:
raise RuntimeError("pip install resemblyzer numpy scikit-learn soundfile")
try:
from sklearn.cluster import AgglomerativeClustering
except Exception:
raise RuntimeError("pip install scikit-learn")
wav = preprocess_wav(str(wav_path))
enc = VoiceEncoder()
_, partial_embeds, partial_slices = enc.embed_utterance(wav, return_partials=True)
sr = 16000.0
duration = float(len(wav)) / sr if len(wav) > 0 else 0.0
if len(partial_embeds) == 0 or duration <= 0.0:
return []
half = 0.8
n_parts = len(partial_embeds)
partial_times = np.array([duration/2.0], dtype=float) if duration <= 2*half else np.linspace(half, duration - half, n_parts)
n_samples = len(partial_embeds)
if n_samples < 2:
return []
X = np.vstack(partial_embeds)
n_speakers = max(2, int(n_speakers))
n_clusters = max(2, min(n_speakers, X.shape[0]))
labels = AgglomerativeClustering(n_clusters=n_clusters).fit_predict(X)
segs = []
cur_spk = int(labels[0])
cur_start = max(0.0, float(partial_times[0] - half))
cur_end = float(partial_times[0] + half)
for i in range(1, len(labels)):
spk = int(labels[i])
st = float(partial_times[i] - half)
en = float(partial_times[i] + half)
if spk == cur_spk and st <= cur_end + 0.1:
cur_end = max(cur_end, en)
else:
segs.append({"start": round(max(0.0, cur_start), 3), "end": round(max(cur_end, cur_start+0.1), 3), "spk": cur_spk})
cur_spk = spk
cur_start = st
cur_end = en
segs.append({"start": round(max(0.0, cur_start), 3), "end": round(max(cur_end, cur_start+0.1), 3), "spk": cur_spk})
return segs
def assign_speakers_to_transcript(transcript: List[Dict[str, Any]], diar_segs: List[Dict[str, Any]]):
def spk_at(t: float):
for s in diar_segs:
if s["start"] - 0.1 <= t <= s["end"] + 0.1:
return s["spk"]
if diar_segs:
bydist = min(diar_segs, key=lambda s: abs((s["start"] + s["end"]) / 2 - t))
return bydist["spk"]
return -1
return [spk_at((float(seg.get("start",0)) + float(seg.get("end",0))) / 2.0) for seg in transcript]
def detect_questions(transcript: List[Dict[str, Any]], qmax: float, wc_max: int, qmark_required: bool, debug: bool=False) -> List[int]:
idxs = []
for i, seg in enumerate(transcript):
st = float(seg.get("start", 0)); en = float(seg.get("end", 0)); d = max(0.0, en - st)
text = (seg.get("text") or "").strip()
wc = len(text.split())
has_qmark = text.endswith("?")
dur_ok = d <= qmax
wc_ok = wc <= wc_max and wc >= 2
is_q = (has_qmark or dur_ok) and wc_ok
if qmark_required:
is_q = has_qmark and wc_ok
if is_q:
idxs.append(i)
return idxs
def build_interview_cuts(
transcript: List[Dict[str, Any]],
min_len: float,
max_len: float,
qmax: float,
gap: float,
lead_in_question: bool,
max_cuts: int,
wc_max: int = 35,
qmark_required: bool = False,
spk_labels: List[int] | None = None,
interviewer_id: int | None = None,
debug: bool = False,
) -> List[Dict[str, Any]]:
if spk_labels is not None and interviewer_id is not None:
qs = set()
for i, seg in enumerate(transcript):
st = float(seg.get("start", 0)); en = float(seg.get("end", 0)); d = en - st
text = (seg.get("text") or "").strip()
wc = len(text.split())
has_q = text.endswith("?")
if spk_labels[i] == interviewer_id and wc <= wc_max and (d <= qmax or has_q):
qs.add(i)
else:
qs = set(detect_questions(transcript, qmax, wc_max, qmark_required, debug))
cuts = []
n = len(transcript)
i = 0
while i < n:
seg = transcript[i]
st = float(seg.get("start", 0)); en = float(seg.get("end", 0)); d = en - st
txt = normspace(seg.get("text", ""))
if not txt or d < 0.2:
i += 1; continue
if i in qs:
j = i + 1
resp_start = None
end_time = en
collected_text = []
segments = []
while j < n:
s2 = transcript[j]
st2 = float(s2.get("start", 0)); en2 = float(s2.get("end", 0)); d2 = en2 - st2
txt2 = normspace(s2.get("text", ""))
if j in qs:
break
if d2 < 0.25:
j += 1
continue
if resp_start is not None and st2 - end_time > gap:
break
if txt2:
if resp_start is None:
resp_start = st2
segments.append({"start": st2, "end": en2})
collected_text.append(txt2)
end_time = en2
if end_time - (resp_start if resp_start is not None else st) >= max_len:
break
j += 1
if resp_start is not None:
start_cut = st if lead_in_question else resp_start
end_cut = end_time
dur = end_cut - start_cut
if dur >= min_len * 0.6:
label = first_sentence(" ".join(collected_text), 70) or "Resposta marcante"
hook = first_sentence(txt, 90) if lead_in_question else ""
cuts.append({
"start": round(start_cut, 3),
"end": round(end_cut, 3),
"label": label,
"hook": hook,
"reason": "Pergunta curta seguida de resposta longa",
"segments": ([{"start": st, "end": en}] if lead_in_question else []) + segments
})
if len(cuts) >= max_cuts:
break
i = max(i + 1, j)
continue
else:
j = i + 1
end_time = en
collected = [txt] if txt else []
segments = [{"start": st, "end": en}]
while j < n and float(transcript[j].get("start",0)) - end_time <= gap:
s2 = transcript[j]
st2 = float(s2.get("start", 0)); en2 = float(s2.get("end", 0))
t2 = normspace(s2.get("text", ""))
if en2 - st2 < 0.25:
j += 1
continue
if t2:
segments.append({"start": st2, "end": en2})
collected.append(t2)
end_time = en2
if end_time - st >= max_len:
break
j += 1
dur = end_time - st
if dur >= min_len and collected:
cuts.append({
"start": round(st, 3),
"end": round(end_time, 3),
"label": first_sentence(" ".join(collected), 70) or "Resposta destacada",
"hook": "",
"reason": "Resposta contínua em entrevista",
"segments": segments
})
if len(cuts) >= max_cuts:
break
i = j
continue
return cuts
def write_shell_and_preview(video_path: Path, base: str, cuts: List[Dict[str, Any]], preview: bool):
out_dir = video_path.parent
sh_path = out_dir / f"{base}_interview_cuts.sh"
parts_dir = out_dir / "export_parts"
parts_dir.mkdir(exist_ok=True)
lines = ["#!/usr/bin/env bash", "set -e"]
for k, c in enumerate(cuts, 1):
ss = c["start"]; ee = c["end"]; dd = round(ee - ss, 3)
out_file = parts_dir / f"{base}_cut_{k:02}.mp4"
cmd = (
f"ffmpeg -hide_banner -loglevel warning -y -ss {ss} -i {shlex.quote(str(video_path))} -t {dd} "
f"-c:v libx264 -crf 22 -preset veryfast -vf scale=1080:-2:flags=bicubic -c:a aac -b:a 128k {shlex.quote(str(out_file))}"
)
lines.append(cmd)
if preview and cuts:
plist = out_dir / f"{base}_interview_preview_list.txt"
with plist.open("w", encoding="utf-8") as f:
for k in range(1, len(cuts)+1):
p = parts_dir / f"{base}_cut_{k:02}.mp4"
f.write(f"file {p.name}\n")
preview_path = out_dir / f"PREVIEW_{base}_interview.mp4"
lines.append(f"ffmpeg -hide_banner -loglevel warning -y -f concat -safe 0 -i {shlex.quote(str(plist))} -c copy {shlex.quote(str(preview_path))}")
sh_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
os.chmod(sh_path, 0o755)
print(f"✅ Script de export: {sh_path}")
def main():
ap = argparse.ArgumentParser("Cortes para entrevistas (pergunta curta + resposta longa)")
ap.add_argument("video", help="Arquivo de entrada (.mp4/.mov)")
ap.add_argument("--min", type=float, default=60.0, help="Duração mínima do corte em segundos")
ap.add_argument("--max", type=float, default=150.0, help="Duração máxima do corte em segundos")
ap.add_argument("--qmax", type=float, default=12.0, help="Máximo de duração para marcar perguntas")
ap.add_argument("--gap", type=float, default=2.0, help="Tolerância de gap entre segmentos")
ap.add_argument("--lead-in-question", choices=["yes","no"], default="yes", help="Incluir pergunta antes da resposta")
ap.add_argument("--max-cuts", type=int, default=20, help="Limite de cortes")
ap.add_argument("--preview", action="store_true", help="Gera comando de prévia por concat")
ap.add_argument("--q-wc-max", type=int, default=35, help="Máximo de palavras para considerar pergunta")
ap.add_argument("--qmark-required", action="store_true", help="Exigir '?' para marcar pergunta")
ap.add_argument("--diarize", action="store_true", help="Ativar diarização com Resemblyzer")
ap.add_argument("--n-speakers", type=int, default=2, help="Número de falantes para clusterizar")
ap.add_argument("--debug", action="store_true", help="Imprimir diagnóstico")
args = ap.parse_args()
video_path = Path(args.video).expanduser().resolve()
base = video_path.stem
transcript_path = video_path.with_name(f"{base}_transcript.json")
if not transcript_path.exists():
print(f"ERRO: não achei '{transcript_path.name}'. Gere a transcrição primeiro com video_cuts_offline_mac_plus_subs.py")
raise SystemExit(1)
transcript = load_json(transcript_path)
spk_labels = None
interviewer_id = None
if args.diarize:
try:
wav16k = ensure_wav_16k_mono(video_path)
diar = diarize_with_resemblyzer(wav16k, n_speakers=args.n_speakers, debug=args.debug)
if diar:
spk_labels = assign_speakers_to_transcript(transcript, diar)
totals = {}
for i, seg in enumerate(transcript):
st = float(seg.get("start",0)); en = float(seg.get("end",0)); d = max(0.0, en-st)
spk = spk_labels[i] if spk_labels and i < len(spk_labels) else -1
totals[spk] = totals.get(spk, 0.0) + d
if totals:
interviewer_id = sorted(totals.items(), key=lambda kv: kv[1])[0][0]
except Exception as e:
print(f"[warn] Diarização falhou: {e}. Seguindo sem diarização.")
cuts = build_interview_cuts(
transcript=transcript,
min_len=args.min,
max_len=args.max,
qmax=args.qmax,
gap=args.gap,
lead_in_question=(args.lead_in_question=="yes"),
max_cuts=args.max_cuts,
wc_max=args.q_wc_max,
qmark_required=args.qmark_required,
spk_labels=spk_labels,
interviewer_id=interviewer_id,
debug=args.debug,
)
out_json = video_path.with_name(f"{base}_interview_cuts.json")
save_json(cuts, out_json)
print(f"✅ Gerado: {out_json}")
write_shell_and_preview(video_path, base, cuts, preview=args.preview)
if __name__ == "__main__":
main()