leicam commited on
Commit
577ac99
·
verified ·
1 Parent(s): f7ef0c3

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +90 -41
app.py CHANGED
@@ -15,6 +15,7 @@ import tempfile
15
  import os
16
  import shutil
17
  import json
 
18
 
19
  # ======================= DATACLASSES =======================
20
 
@@ -42,29 +43,23 @@ class FaceBox:
42
  # ======================= UTILS =======================
43
 
44
  def resolve_video_path(v: Union[str, dict, None]) -> Optional[str]:
45
- """Gradio às vezes entrega str (caminho) ou dict {'name':..., 'data':...}. Normaliza para caminho."""
46
  if v is None:
47
  return None
48
  if isinstance(v, str):
49
  return v
50
  if isinstance(v, dict):
51
- # Prioriza caminho local temporário
52
- if "name" in v and isinstance(v["name"], str) and len(v["name"]) > 0 and os.path.exists(v["name"]):
53
  return v["name"]
54
- # Algumas versões usam 'path'
55
  if "path" in v and isinstance(v["path"], str) and os.path.exists(v["path"]):
56
  return v["path"]
57
- # Fallback: alguns frontends mandam apenas nome base; não há como resolver sem arquivo
58
  return v.get("name") or v.get("path")
59
  return None
60
 
61
  def probe_duration(path: str) -> Optional[float]:
62
- """Retorna a duração (segundos) via ffprobe, ou None se falhar."""
63
  try:
64
- cmd = [
65
- "ffprobe", "-v", "error", "-show_entries", "format=duration",
66
- "-of", "json", path
67
- ]
68
  out = subprocess.run(cmd, check=True, capture_output=True)
69
  data = json.loads(out.stdout.decode("utf-8", errors="ignore"))
70
  dur = float(data.get("format", {}).get("duration", 0.0))
@@ -73,6 +68,21 @@ def probe_duration(path: str) -> Optional[float]:
73
  print(f"[ffprobe] falhou: {e}")
74
  return None
75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76
  # ======================= FACE TRACKING =======================
77
 
78
  class FaceTracker:
@@ -176,42 +186,82 @@ class FaceTracker:
176
 
177
  return (crop_x, crop_y, crop_w, crop_h)
178
 
179
- # ======================= TRANSCRIÇÃO =======================
 
 
 
 
 
 
 
 
 
 
 
 
 
180
 
181
- def extract_audio_wav(input_video: str, sr: int = 16000) -> str:
182
- """Extrai o áudio para WAV mono 16kHz para robustez da transcrição."""
183
- fd, tmp_path = tempfile.mkstemp(suffix=".wav")
184
  os.close(fd)
185
- print(f"[ffmpeg] extraindo WAV -> {tmp_path}")
186
- cmd = [
187
- "ffmpeg", "-y", "-i", input_video,
188
- "-vn", "-ac", "1", "-ar", str(sr), "-f", "wav", tmp_path
 
 
 
 
 
 
189
  ]
190
- subprocess.run(cmd, check=True, capture_output=True)
191
- return tmp_path
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
192
 
193
- def transcribe(video_file: str, model_size: str = "small") -> List[Segment]:
194
- true_path = resolve_video_path(video_file)
195
- if not true_path or not os.path.exists(true_path):
196
- print(f"[transcribe] caminho inválido: {video_file}")
197
- return []
198
 
199
- # Durações para diagnóstico
200
- vid_dur = probe_duration(true_path)
201
- print(f"[probe] duração do vídeo: {vid_dur:.2f}s" if vid_dur else "[probe] duração do vídeo: desconhecida")
202
 
203
- print(f"[whisper] carregando modelo: {model_size}")
204
- model = whisper.load_model(model_size) # device auto
205
- print(f"[whisper] extraindo áudio WAV…")
206
- audio_wav = extract_audio_wav(true_path, sr=16000)
207
 
 
208
  wav_dur = probe_duration(audio_wav)
209
- print(f"[probe] duração do WAV: {wav_dur:.2f}s" if wav_dur else "[probe] duração do WAV: desconhecida")
210
- if vid_dur and wav_dur and wav_dur + 1 < vid_dur:
211
- print("[aviso] WAV menor que o vídeo — verifique codecs/ffmpeg. Mesmo assim vou transcrever o que foi extraído.")
212
 
213
  print("[whisper] transcrevendo…")
214
- # Configs mais robustas para CPU/Spaces
215
  result = model.transcribe(
216
  audio_wav,
217
  language="pt",
@@ -224,8 +274,8 @@ def transcribe(video_file: str, model_size: str = "small") -> List[Segment]:
224
 
225
  segments = [Segment(start=s["start"], end=s["end"], text=s["text"].strip())
226
  for s in result.get("segments", [])]
227
-
228
  print(f"[whisper] segmentos: {len(segments)}")
 
229
  try:
230
  Path(audio_wav).unlink(missing_ok=True)
231
  except Exception:
@@ -256,7 +306,7 @@ def extract_video_segment(input_video: str, output_video: str, start_time: float
256
 
257
  def apply_smart_crop_to_video(input_path: str, output_path: str, target_width: int,
258
  target_height: int, sample_frames: int = 10) -> bool:
259
- """Calcula o melhor crop com rastreamento facial e aplica o crop com FFmpeg preservando o áudio."""
260
  tracker = FaceTracker()
261
  cap = cv2.VideoCapture(input_path)
262
  if not cap.isOpened():
@@ -434,7 +484,6 @@ def generate_creative_cuts(video_file: str, segments: List[Segment], output_dir:
434
  Path(output_dir).mkdir(parents=True, exist_ok=True)
435
  outputs = []
436
 
437
- import random
438
  for i in range(int(k)):
439
  num_blocks = random.randint(min_blocks, min(max_blocks, len(segments)))
440
  step = max(1, len(segments) // num_blocks)
@@ -563,7 +612,7 @@ with gr.Blocks(title="Editor de Cortes Automático", css=css) as demo:
563
  maxb = gr.Number(value=8, label="Blocos max")
564
  with gr.Row():
565
  k2 = gr.Number(value=2, label="Quantidade")
566
- gap2 = gr.Number(value=0.60, label="Gap")
567
  pad2 = gr.Number(value=0.08, label="Pad")
568
  ar_mode2 = gr.Dropdown(["Original","Vertical 9:16","Quadrado 1:1","Retrato 4:5"],
569
  value="Original", label="Formato")
@@ -581,5 +630,5 @@ with gr.Blocks(title="Editor de Cortes Automático", css=css) as demo:
581
  outputs=[out_creative, status_creative])
582
 
583
  if __name__ == "__main__":
 
584
  demo.queue(max_size=20).launch()
585
-
 
15
  import os
16
  import shutil
17
  import json
18
+ import random
19
 
20
  # ======================= DATACLASSES =======================
21
 
 
43
  # ======================= UTILS =======================
44
 
45
  def resolve_video_path(v: Union[str, dict, None]) -> Optional[str]:
46
+ """Gradio pode entregar str (caminho) ou dict. Normaliza para caminho local."""
47
  if v is None:
48
  return None
49
  if isinstance(v, str):
50
  return v
51
  if isinstance(v, dict):
52
+ if "name" in v and isinstance(v["name"], str) and os.path.exists(v["name"]):
 
53
  return v["name"]
 
54
  if "path" in v and isinstance(v["path"], str) and os.path.exists(v["path"]):
55
  return v["path"]
 
56
  return v.get("name") or v.get("path")
57
  return None
58
 
59
  def probe_duration(path: str) -> Optional[float]:
60
+ """Retorna a duração (s) via ffprobe."""
61
  try:
62
+ cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "json", path]
 
 
 
63
  out = subprocess.run(cmd, check=True, capture_output=True)
64
  data = json.loads(out.stdout.decode("utf-8", errors="ignore"))
65
  dur = float(data.get("format", {}).get("duration", 0.0))
 
68
  print(f"[ffprobe] falhou: {e}")
69
  return None
70
 
71
+ def remux_video(src: str) -> str:
72
+ """Gera um MP4 remuxado (ajusta PTS/timebase e faststart)."""
73
+ fd, tmp_path = tempfile.mkstemp(suffix=".mp4")
74
+ os.close(fd)
75
+ cmd = [
76
+ "ffmpeg", "-y",
77
+ "-fflags", "+genpts",
78
+ "-i", src,
79
+ "-c", "copy",
80
+ "-movflags", "+faststart",
81
+ tmp_path
82
+ ]
83
+ subprocess.run(cmd, check=True, capture_output=True)
84
+ return tmp_path
85
+
86
  # ======================= FACE TRACKING =======================
87
 
88
  class FaceTracker:
 
186
 
187
  return (crop_x, crop_y, crop_w, crop_h)
188
 
189
+ # ======================= TRANSCRIÇÃO (ROBUSTA) =======================
190
+
191
+ def extract_audio_wav_strong(input_video: str, sr: int = 16000) -> str:
192
+ """
193
+ Extração de áudio à prova de VFR/PTS ruins.
194
+ 1) Remuxa o vídeo (ajusta timebase)
195
+ 2) Extrai WAV mono 16k
196
+ 3) Se o WAV vier curto, faz fallback re-decodificando o original
197
+ """
198
+ vid_dur = probe_duration(input_video)
199
+ print(f"[probe] video: {vid_dur:.2f}s" if vid_dur else "[probe] video: ?")
200
+
201
+ remux = remux_video(input_video)
202
+ print(f"[remux] -> {remux}")
203
 
204
+ fd, wav_path = tempfile.mkstemp(suffix=".wav")
 
 
205
  os.close(fd)
206
+
207
+ # Tentativa 1 — do remux, convertendo para PCM
208
+ cmd1 = [
209
+ "ffmpeg", "-y",
210
+ "-i", remux,
211
+ "-vn",
212
+ "-map", "0:a:0?",
213
+ "-ac", "1", "-ar", str(sr),
214
+ "-c:a", "pcm_s16le",
215
+ wav_path
216
  ]
217
+ subprocess.run(cmd1, check=True, capture_output=True)
218
+ wav_dur = probe_duration(wav_path)
219
+ print(f"[probe] wav #1: {wav_dur:.2f}s" if wav_dur else "[probe] wav #1: ?")
220
+
221
+ # Fallback — redecodifica direto do original
222
+ if vid_dur and (not wav_dur or wav_dur + 2 < vid_dur):
223
+ print("[fallback] re-decodificando o arquivo original…")
224
+ fd2, wav2 = tempfile.mkstemp(suffix=".wav")
225
+ os.close(fd2)
226
+ cmd2 = [
227
+ "ffmpeg", "-y",
228
+ "-fflags", "+genpts",
229
+ "-i", input_video,
230
+ "-vn",
231
+ "-ac", "1", "-ar", str(sr),
232
+ "-c:a", "pcm_s16le",
233
+ wav2
234
+ ]
235
+ subprocess.run(cmd2, check=True, capture_output=True)
236
+ wav2_dur = probe_duration(wav2)
237
+ print(f"[probe] wav #2: {wav2_dur:.2f}s" if wav2_dur else "[probe] wav #2: ?")
238
+ if wav2_dur and (not wav_dur or wav2_dur > wav_dur):
239
+ try:
240
+ Path(wav_path).unlink(missing_ok=True)
241
+ Path(remux).unlink(missing_ok=True)
242
+ except Exception:
243
+ pass
244
+ return wav2
245
 
246
+ try:
247
+ Path(remux).unlink(missing_ok=True)
248
+ except Exception:
249
+ pass
250
+ return wav_path
251
 
252
+ def transcribe(video_file: str, model_size: str = "small") -> List[Segment]:
253
+ print(f"[whisper] modelo: {model_size}")
254
+ model = whisper.load_model(model_size)
255
 
256
+ print("[audio] extraindo WAV robusto…")
257
+ audio_wav = extract_audio_wav_strong(video_file, sr=16000)
 
 
258
 
259
+ vid_dur = probe_duration(video_file)
260
  wav_dur = probe_duration(audio_wav)
261
+ if vid_dur: print(f"[dur] vídeo: {vid_dur:.2f}s")
262
+ if wav_dur: print(f"[dur] wav: {wav_dur:.2f}s")
 
263
 
264
  print("[whisper] transcrevendo…")
 
265
  result = model.transcribe(
266
  audio_wav,
267
  language="pt",
 
274
 
275
  segments = [Segment(start=s["start"], end=s["end"], text=s["text"].strip())
276
  for s in result.get("segments", [])]
 
277
  print(f"[whisper] segmentos: {len(segments)}")
278
+
279
  try:
280
  Path(audio_wav).unlink(missing_ok=True)
281
  except Exception:
 
306
 
307
  def apply_smart_crop_to_video(input_path: str, output_path: str, target_width: int,
308
  target_height: int, sample_frames: int = 10) -> bool:
309
+ """Calcula o melhor crop com rastreamento facial e aplica com FFmpeg preservando áudio."""
310
  tracker = FaceTracker()
311
  cap = cv2.VideoCapture(input_path)
312
  if not cap.isOpened():
 
484
  Path(output_dir).mkdir(parents=True, exist_ok=True)
485
  outputs = []
486
 
 
487
  for i in range(int(k)):
488
  num_blocks = random.randint(min_blocks, min(max_blocks, len(segments)))
489
  step = max(1, len(segments) // num_blocks)
 
612
  maxb = gr.Number(value=8, label="Blocos max")
613
  with gr.Row():
614
  k2 = gr.Number(value=2, label="Quantidade")
615
+ gap2 = gr.Number(value=0.60", label="Gap")
616
  pad2 = gr.Number(value=0.08, label="Pad")
617
  ar_mode2 = gr.Dropdown(["Original","Vertical 9:16","Quadrado 1:1","Retrato 4:5"],
618
  value="Original", label="Formato")
 
630
  outputs=[out_creative, status_creative])
631
 
632
  if __name__ == "__main__":
633
+ # Fila para tarefas longas (compatível com Gradio 4)
634
  demo.queue(max_size=20).launch()