|
|
|
|
|
""" |
|
|
interview_cuts.py — Gera cortes para entrevistas em PT (pergunta curta + resposta longa). |
|
|
|
|
|
Uso típico: |
|
|
python interview_cuts.py video.mp4 --min 60 --max 150 --qmax 12 --gap 2.0 --lead-in-question yes --max-cuts 20 --preview |
|
|
|
|
|
Pré-requisitos: |
|
|
- Ter o arquivo <base>_transcript.json na mesma pasta do vídeo (gerado pelo video_cuts_offline_mac_plus_subs.py). |
|
|
|
|
|
Saídas: |
|
|
- <base>_interview_cuts.json |
|
|
- <base>_interview_cuts.sh |
|
|
- PREVIEW_<base>_interview.mp4 (opcional) |
|
|
""" |
|
|
import argparse, json, os, re, shlex, subprocess, math |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Any |
|
|
|
|
|
try: |
|
|
import numpy as np |
|
|
except Exception: |
|
|
np = None |
|
|
try: |
|
|
from resemblyzer import VoiceEncoder, preprocess_wav |
|
|
_HAVE_RESEMBLYZER = True |
|
|
except Exception: |
|
|
VoiceEncoder = None |
|
|
preprocess_wav = None |
|
|
_HAVE_RESEMBLYZER = False |
|
|
|
|
|
|
|
|
def load_json(p: Path): |
|
|
with p.open("r", encoding="utf-8") as f: |
|
|
return json.load(f) |
|
|
|
|
|
def save_json(obj, p: Path): |
|
|
with p.open("w", encoding="utf-8") as f: |
|
|
json.dump(obj, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
def normspace(s: str) -> str: |
|
|
return re.sub(r"\s+", " ", (s or "").strip()) |
|
|
|
|
|
def first_sentence(s: str, limit=120) -> str: |
|
|
s = normspace(s) |
|
|
parts = re.split(r"(?<=[.!?])\s+", s) |
|
|
out = parts[0] if parts and parts[0] else s |
|
|
return out[:limit].rstrip() |
|
|
|
|
|
|
|
|
def ensure_wav_16k_mono(video_path: Path) -> Path: |
|
|
"""Export a temporary 16k mono wav next to the video if not present.""" |
|
|
wav_path = video_path.with_suffix(".16k.wav") |
|
|
if wav_path.exists(): |
|
|
return wav_path |
|
|
cmd = [ |
|
|
"ffmpeg", "-y", |
|
|
"-i", str(video_path), |
|
|
"-ac", "1", "-ar", "16000", |
|
|
str(wav_path) |
|
|
] |
|
|
subprocess.run(cmd, check=True) |
|
|
return wav_path |
|
|
|
|
|
|
|
|
def diarize_with_resemblyzer(wav_path: Path, n_speakers: int = 2, debug: bool = False): |
|
|
"""Lightweight diarization using Resemblyzer.""" |
|
|
if not _HAVE_RESEMBLYZER or np is None: |
|
|
raise RuntimeError("pip install resemblyzer numpy scikit-learn soundfile") |
|
|
try: |
|
|
from sklearn.cluster import AgglomerativeClustering |
|
|
except Exception: |
|
|
raise RuntimeError("pip install scikit-learn") |
|
|
|
|
|
wav = preprocess_wav(str(wav_path)) |
|
|
enc = VoiceEncoder() |
|
|
_, partial_embeds, partial_slices = enc.embed_utterance(wav, return_partials=True) |
|
|
sr = 16000.0 |
|
|
duration = float(len(wav)) / sr if len(wav) > 0 else 0.0 |
|
|
if len(partial_embeds) == 0 or duration <= 0.0: |
|
|
return [] |
|
|
half = 0.8 |
|
|
n_parts = len(partial_embeds) |
|
|
partial_times = np.array([duration/2.0], dtype=float) if duration <= 2*half else np.linspace(half, duration - half, n_parts) |
|
|
n_samples = len(partial_embeds) |
|
|
if n_samples < 2: |
|
|
return [] |
|
|
X = np.vstack(partial_embeds) |
|
|
n_speakers = max(2, int(n_speakers)) |
|
|
n_clusters = max(2, min(n_speakers, X.shape[0])) |
|
|
labels = AgglomerativeClustering(n_clusters=n_clusters).fit_predict(X) |
|
|
segs = [] |
|
|
cur_spk = int(labels[0]) |
|
|
cur_start = max(0.0, float(partial_times[0] - half)) |
|
|
cur_end = float(partial_times[0] + half) |
|
|
for i in range(1, len(labels)): |
|
|
spk = int(labels[i]) |
|
|
st = float(partial_times[i] - half) |
|
|
en = float(partial_times[i] + half) |
|
|
if spk == cur_spk and st <= cur_end + 0.1: |
|
|
cur_end = max(cur_end, en) |
|
|
else: |
|
|
segs.append({"start": round(max(0.0, cur_start), 3), "end": round(max(cur_end, cur_start+0.1), 3), "spk": cur_spk}) |
|
|
cur_spk = spk |
|
|
cur_start = st |
|
|
cur_end = en |
|
|
segs.append({"start": round(max(0.0, cur_start), 3), "end": round(max(cur_end, cur_start+0.1), 3), "spk": cur_spk}) |
|
|
return segs |
|
|
|
|
|
|
|
|
def assign_speakers_to_transcript(transcript: List[Dict[str, Any]], diar_segs: List[Dict[str, Any]]): |
|
|
def spk_at(t: float): |
|
|
for s in diar_segs: |
|
|
if s["start"] - 0.1 <= t <= s["end"] + 0.1: |
|
|
return s["spk"] |
|
|
if diar_segs: |
|
|
bydist = min(diar_segs, key=lambda s: abs((s["start"] + s["end"]) / 2 - t)) |
|
|
return bydist["spk"] |
|
|
return -1 |
|
|
return [spk_at((float(seg.get("start",0)) + float(seg.get("end",0))) / 2.0) for seg in transcript] |
|
|
|
|
|
|
|
|
def detect_questions(transcript: List[Dict[str, Any]], qmax: float, wc_max: int, qmark_required: bool, debug: bool=False) -> List[int]: |
|
|
idxs = [] |
|
|
for i, seg in enumerate(transcript): |
|
|
st = float(seg.get("start", 0)); en = float(seg.get("end", 0)); d = max(0.0, en - st) |
|
|
text = (seg.get("text") or "").strip() |
|
|
wc = len(text.split()) |
|
|
has_qmark = text.endswith("?") |
|
|
dur_ok = d <= qmax |
|
|
wc_ok = wc <= wc_max and wc >= 2 |
|
|
is_q = (has_qmark or dur_ok) and wc_ok |
|
|
if qmark_required: |
|
|
is_q = has_qmark and wc_ok |
|
|
if is_q: |
|
|
idxs.append(i) |
|
|
return idxs |
|
|
|
|
|
|
|
|
def build_interview_cuts( |
|
|
transcript: List[Dict[str, Any]], |
|
|
min_len: float, |
|
|
max_len: float, |
|
|
qmax: float, |
|
|
gap: float, |
|
|
lead_in_question: bool, |
|
|
max_cuts: int, |
|
|
wc_max: int = 35, |
|
|
qmark_required: bool = False, |
|
|
spk_labels: List[int] | None = None, |
|
|
interviewer_id: int | None = None, |
|
|
debug: bool = False, |
|
|
) -> List[Dict[str, Any]]: |
|
|
if spk_labels is not None and interviewer_id is not None: |
|
|
qs = set() |
|
|
for i, seg in enumerate(transcript): |
|
|
st = float(seg.get("start", 0)); en = float(seg.get("end", 0)); d = en - st |
|
|
text = (seg.get("text") or "").strip() |
|
|
wc = len(text.split()) |
|
|
has_q = text.endswith("?") |
|
|
if spk_labels[i] == interviewer_id and wc <= wc_max and (d <= qmax or has_q): |
|
|
qs.add(i) |
|
|
else: |
|
|
qs = set(detect_questions(transcript, qmax, wc_max, qmark_required, debug)) |
|
|
cuts = [] |
|
|
n = len(transcript) |
|
|
i = 0 |
|
|
while i < n: |
|
|
seg = transcript[i] |
|
|
st = float(seg.get("start", 0)); en = float(seg.get("end", 0)); d = en - st |
|
|
txt = normspace(seg.get("text", "")) |
|
|
if not txt or d < 0.2: |
|
|
i += 1; continue |
|
|
if i in qs: |
|
|
j = i + 1 |
|
|
resp_start = None |
|
|
end_time = en |
|
|
collected_text = [] |
|
|
segments = [] |
|
|
while j < n: |
|
|
s2 = transcript[j] |
|
|
st2 = float(s2.get("start", 0)); en2 = float(s2.get("end", 0)); d2 = en2 - st2 |
|
|
txt2 = normspace(s2.get("text", "")) |
|
|
if j in qs: |
|
|
break |
|
|
if d2 < 0.25: |
|
|
j += 1 |
|
|
continue |
|
|
if resp_start is not None and st2 - end_time > gap: |
|
|
break |
|
|
if txt2: |
|
|
if resp_start is None: |
|
|
resp_start = st2 |
|
|
segments.append({"start": st2, "end": en2}) |
|
|
collected_text.append(txt2) |
|
|
end_time = en2 |
|
|
if end_time - (resp_start if resp_start is not None else st) >= max_len: |
|
|
break |
|
|
j += 1 |
|
|
if resp_start is not None: |
|
|
start_cut = st if lead_in_question else resp_start |
|
|
end_cut = end_time |
|
|
dur = end_cut - start_cut |
|
|
if dur >= min_len * 0.6: |
|
|
label = first_sentence(" ".join(collected_text), 70) or "Resposta marcante" |
|
|
hook = first_sentence(txt, 90) if lead_in_question else "" |
|
|
cuts.append({ |
|
|
"start": round(start_cut, 3), |
|
|
"end": round(end_cut, 3), |
|
|
"label": label, |
|
|
"hook": hook, |
|
|
"reason": "Pergunta curta seguida de resposta longa", |
|
|
"segments": ([{"start": st, "end": en}] if lead_in_question else []) + segments |
|
|
}) |
|
|
if len(cuts) >= max_cuts: |
|
|
break |
|
|
i = max(i + 1, j) |
|
|
continue |
|
|
else: |
|
|
j = i + 1 |
|
|
end_time = en |
|
|
collected = [txt] if txt else [] |
|
|
segments = [{"start": st, "end": en}] |
|
|
while j < n and float(transcript[j].get("start",0)) - end_time <= gap: |
|
|
s2 = transcript[j] |
|
|
st2 = float(s2.get("start", 0)); en2 = float(s2.get("end", 0)) |
|
|
t2 = normspace(s2.get("text", "")) |
|
|
if en2 - st2 < 0.25: |
|
|
j += 1 |
|
|
continue |
|
|
if t2: |
|
|
segments.append({"start": st2, "end": en2}) |
|
|
collected.append(t2) |
|
|
end_time = en2 |
|
|
if end_time - st >= max_len: |
|
|
break |
|
|
j += 1 |
|
|
dur = end_time - st |
|
|
if dur >= min_len and collected: |
|
|
cuts.append({ |
|
|
"start": round(st, 3), |
|
|
"end": round(end_time, 3), |
|
|
"label": first_sentence(" ".join(collected), 70) or "Resposta destacada", |
|
|
"hook": "", |
|
|
"reason": "Resposta contínua em entrevista", |
|
|
"segments": segments |
|
|
}) |
|
|
if len(cuts) >= max_cuts: |
|
|
break |
|
|
i = j |
|
|
continue |
|
|
return cuts |
|
|
|
|
|
|
|
|
def write_shell_and_preview(video_path: Path, base: str, cuts: List[Dict[str, Any]], preview: bool): |
|
|
out_dir = video_path.parent |
|
|
sh_path = out_dir / f"{base}_interview_cuts.sh" |
|
|
parts_dir = out_dir / "export_parts" |
|
|
parts_dir.mkdir(exist_ok=True) |
|
|
|
|
|
lines = ["#!/usr/bin/env bash", "set -e"] |
|
|
for k, c in enumerate(cuts, 1): |
|
|
ss = c["start"]; ee = c["end"]; dd = round(ee - ss, 3) |
|
|
out_file = parts_dir / f"{base}_cut_{k:02}.mp4" |
|
|
cmd = ( |
|
|
f"ffmpeg -hide_banner -loglevel warning -y -ss {ss} -i {shlex.quote(str(video_path))} -t {dd} " |
|
|
f"-c:v libx264 -crf 22 -preset veryfast -vf scale=1080:-2:flags=bicubic -c:a aac -b:a 128k {shlex.quote(str(out_file))}" |
|
|
) |
|
|
lines.append(cmd) |
|
|
if preview and cuts: |
|
|
plist = out_dir / f"{base}_interview_preview_list.txt" |
|
|
with plist.open("w", encoding="utf-8") as f: |
|
|
for k in range(1, len(cuts)+1): |
|
|
p = parts_dir / f"{base}_cut_{k:02}.mp4" |
|
|
f.write(f"file {p.name}\n") |
|
|
preview_path = out_dir / f"PREVIEW_{base}_interview.mp4" |
|
|
lines.append(f"ffmpeg -hide_banner -loglevel warning -y -f concat -safe 0 -i {shlex.quote(str(plist))} -c copy {shlex.quote(str(preview_path))}") |
|
|
|
|
|
sh_path.write_text("\n".join(lines) + "\n", encoding="utf-8") |
|
|
os.chmod(sh_path, 0o755) |
|
|
print(f"✅ Script de export: {sh_path}") |
|
|
|
|
|
|
|
|
def main(): |
|
|
ap = argparse.ArgumentParser("Cortes para entrevistas (pergunta curta + resposta longa)") |
|
|
ap.add_argument("video", help="Arquivo de entrada (.mp4/.mov)") |
|
|
ap.add_argument("--min", type=float, default=60.0, help="Duração mínima do corte em segundos") |
|
|
ap.add_argument("--max", type=float, default=150.0, help="Duração máxima do corte em segundos") |
|
|
ap.add_argument("--qmax", type=float, default=12.0, help="Máximo de duração para marcar perguntas") |
|
|
ap.add_argument("--gap", type=float, default=2.0, help="Tolerância de gap entre segmentos") |
|
|
ap.add_argument("--lead-in-question", choices=["yes","no"], default="yes", help="Incluir pergunta antes da resposta") |
|
|
ap.add_argument("--max-cuts", type=int, default=20, help="Limite de cortes") |
|
|
ap.add_argument("--preview", action="store_true", help="Gera comando de prévia por concat") |
|
|
ap.add_argument("--q-wc-max", type=int, default=35, help="Máximo de palavras para considerar pergunta") |
|
|
ap.add_argument("--qmark-required", action="store_true", help="Exigir '?' para marcar pergunta") |
|
|
ap.add_argument("--diarize", action="store_true", help="Ativar diarização com Resemblyzer") |
|
|
ap.add_argument("--n-speakers", type=int, default=2, help="Número de falantes para clusterizar") |
|
|
ap.add_argument("--debug", action="store_true", help="Imprimir diagnóstico") |
|
|
args = ap.parse_args() |
|
|
|
|
|
video_path = Path(args.video).expanduser().resolve() |
|
|
base = video_path.stem |
|
|
transcript_path = video_path.with_name(f"{base}_transcript.json") |
|
|
if not transcript_path.exists(): |
|
|
print(f"ERRO: não achei '{transcript_path.name}'. Gere a transcrição primeiro com video_cuts_offline_mac_plus_subs.py") |
|
|
raise SystemExit(1) |
|
|
|
|
|
transcript = load_json(transcript_path) |
|
|
|
|
|
spk_labels = None |
|
|
interviewer_id = None |
|
|
if args.diarize: |
|
|
try: |
|
|
wav16k = ensure_wav_16k_mono(video_path) |
|
|
diar = diarize_with_resemblyzer(wav16k, n_speakers=args.n_speakers, debug=args.debug) |
|
|
if diar: |
|
|
spk_labels = assign_speakers_to_transcript(transcript, diar) |
|
|
totals = {} |
|
|
for i, seg in enumerate(transcript): |
|
|
st = float(seg.get("start",0)); en = float(seg.get("end",0)); d = max(0.0, en-st) |
|
|
spk = spk_labels[i] if spk_labels and i < len(spk_labels) else -1 |
|
|
totals[spk] = totals.get(spk, 0.0) + d |
|
|
if totals: |
|
|
interviewer_id = sorted(totals.items(), key=lambda kv: kv[1])[0][0] |
|
|
except Exception as e: |
|
|
print(f"[warn] Diarização falhou: {e}. Seguindo sem diarização.") |
|
|
|
|
|
cuts = build_interview_cuts( |
|
|
transcript=transcript, |
|
|
min_len=args.min, |
|
|
max_len=args.max, |
|
|
qmax=args.qmax, |
|
|
gap=args.gap, |
|
|
lead_in_question=(args.lead_in_question=="yes"), |
|
|
max_cuts=args.max_cuts, |
|
|
wc_max=args.q_wc_max, |
|
|
qmark_required=args.qmark_required, |
|
|
spk_labels=spk_labels, |
|
|
interviewer_id=interviewer_id, |
|
|
debug=args.debug, |
|
|
) |
|
|
|
|
|
out_json = video_path.with_name(f"{base}_interview_cuts.json") |
|
|
save_json(cuts, out_json) |
|
|
print(f"✅ Gerado: {out_json}") |
|
|
|
|
|
write_shell_and_preview(video_path, base, cuts, preview=args.preview) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|