import gradio as gr import subprocess import os import re import json import shutil from PIL import Image, ImageDraw # ══════════════════════════════════════════════════════════════════ # Utilidades # ══════════════════════════════════════════════════════════════════ def gpu_disponivel(): """Verifica se a GPU NVIDIA está acessível via nvidia-smi.""" r = subprocess.run(["nvidia-smi"], capture_output=True, text=True) return r.returncode == 0 def contar_frames(path): """Conta frames via nb_read_packets — mais confiável que nb_frames.""" r = subprocess.run([ "ffprobe", "-v", "error", "-select_streams", "v:0", "-count_packets", "-show_entries", "stream=nb_read_packets", "-of", "json", path ], capture_output=True, text=True) try: val = json.loads(r.stdout).get("streams", [{}])[0].get("nb_read_packets") return int(val) if val else None except (json.JSONDecodeError, ValueError, TypeError): return None # ══════════════════════════════════════════════════════════════════ # Logo — preparação, posição e preview ao vivo # ══════════════════════════════════════════════════════════════════ def preparar_logo(logo_path, opacidade_pct): """ SOMENTE aplica a opacidade (escala o canal alpha) via Pillow. O redimensionamento é feito pelo próprio FFmpeg com scale filter — exatamente como o LegendadorBrasileiroWhisperX faz. Assim evitamos re-saves desnecessários da PNG e preservamos as bordas originais. """ img = Image.open(logo_path).convert("RGBA") if opacidade_pct < 100: r, g, b, a = img.split() fator = opacidade_pct / 100.0 a = a.point(lambda x: int(x * fator)) img = Image.merge("RGBA", (r, g, b, a)) temp_path = "/tmp/logo_overlay.png" img.save(temp_path, "PNG") return temp_path def calcular_posicao_logo(posicao, margem=20, offset_x=0, offset_y=0): """Retorna a expressão x=...:y=... para o filtro overlay do FFmpeg.""" ox, oy = offset_x, offset_y m = margem return { "Centro": f"x=(W-w)/2+{ox}:y=(H-h)/2+{oy}", "Canto superior esquerdo": f"x={m+ox}:y={m+oy}", "Canto superior direito": f"x=W-w-{m}+{ox}:y={m+oy}", "Canto inferior esquerdo": f"x={m+ox}:y=H-h-{m}+{oy}", "Canto inferior direito": f"x=W-w-{m}+{ox}:y=H-h-{m}+{oy}", }.get(posicao, f"x=(W-w)/2+{ox}:y=(H-h)/2+{oy}") # ── Preview ao vivo ─────────────────────────────────────────────── PREVIEW_W, PREVIEW_H = 640, 360 MARGEM_PREVIEW = 12 def gerar_preview_logo(logo_file, logo_posicao, logo_margem, logo_offset_x, logo_offset_y, logo_tamanho, logo_opacidade): """Gera um preview PNG mostrando a logo posicionada sobre um fundo simulado.""" canvas = Image.new("RGBA", (PREVIEW_W, PREVIEW_H), (30, 30, 30, 255)) draw = ImageDraw.Draw(canvas) for x in range(0, PREVIEW_W, 40): draw.line([(x, 0), (x, PREVIEW_H)], fill=(50, 50, 50, 255), width=1) for y in range(0, PREVIEW_H, 40): draw.line([(0, y), (PREVIEW_W, y)], fill=(50, 50, 50, 255), width=1) draw.text((PREVIEW_W // 2 - 60, PREVIEW_H // 2 - 8), "[ seu vídeo ]", fill=(80, 80, 80, 255)) if logo_file is None: out = "/tmp/logo_preview.png" canvas.convert("RGB").save(out) return out try: logo = Image.open(logo_file).convert("RGBA") alvo_w = max(1, int(PREVIEW_W * logo_tamanho / 100)) ratio = alvo_w / logo.width alvo_h = max(1, int(logo.height * ratio)) logo = logo.resize((alvo_w, alvo_h), Image.LANCZOS) # Aplica opacidade (idêntico ao radar-virumania) if logo_opacidade < 100: r, g, b, a = logo.split() fator = logo_opacidade / 100.0 a = a.point(lambda px: int(px * fator)) logo = Image.merge("RGBA", (r, g, b, a)) escala = PREVIEW_W / 1920 mp = int(logo_margem * escala) ox = int(logo_offset_x * escala) oy = int(logo_offset_y * escala) pos_map = { "Centro": ((PREVIEW_W - alvo_w) // 2 + ox, (PREVIEW_H - alvo_h) // 2 + oy), "Canto superior esquerdo": (mp + ox, mp + oy), "Canto superior direito": (PREVIEW_W - alvo_w - mp + ox, mp + oy), "Canto inferior esquerdo": (mp + ox, PREVIEW_H - alvo_h - mp + oy), "Canto inferior direito": (PREVIEW_W - alvo_w - mp + ox, PREVIEW_H - alvo_h - mp + oy), } px, py = pos_map.get(logo_posicao, ((PREVIEW_W - alvo_w) // 2 + ox, (PREVIEW_H - alvo_h) // 2 + oy)) px = max(0, min(px, PREVIEW_W - alvo_w)) py = max(0, min(py, PREVIEW_H - alvo_h)) canvas.alpha_composite(logo, (px, py)) except Exception: pass out = "/tmp/logo_preview.png" canvas.convert("RGB").save(out) return out # ══════════════════════════════════════════════════════════════════ # Áudio — perfis disponíveis # ══════════════════════════════════════════════════════════════════ # label → (encoder, profile_arg_list) AUDIO_CODECS = { "libfdk_aac HE-AACv2 (mais eficiente, só estéreo)": ("libfdk_aac", ["-profile:a", "aac_he_v2"]), "libfdk_aac HE-AAC (eficiente, mono ou estéreo)": ("libfdk_aac", ["-profile:a", "aac_he"]), "libfdk_aac LC (alta qualidade, qualquer bitrate)": ("libfdk_aac", ["-profile:a", "aac_low"]), "aac (codec nativo do FFmpeg, LC)": ("aac", []), } def montar_args_audio(codec_label, bitrate, sample_rate, canais): """Monta a lista de argumentos FFmpeg para o codec de áudio escolhido.""" encoder, prof = AUDIO_CODECS.get(codec_label, AUDIO_CODECS[ "libfdk_aac HE-AACv2 (mais eficiente, só estéreo)" ]) # HE-AACv2 exige estéreo — força se o usuário pediu mono if "he_v2" in " ".join(prof) and canais == 1: canais = 2 args = ["-c:a", encoder] + prof + [ "-b:a", bitrate, "-ar", str(sample_rate), "-ac", str(canais), ] return args # ══════════════════════════════════════════════════════════════════ # Pipeline principal # ══════════════════════════════════════════════════════════════════ def reencode_video( video_file, modo, resolucao, fps, crf_valor, # áudio audio_codec_label, audio_bitrate, audio_sample_rate, audio_canais, normalizar_audio, # filtros remover_duplicados, # logo logo_file, logo_posicao, logo_margem, logo_offset_x, logo_offset_y, logo_tamanho, logo_opacidade, ): if video_file is None: yield None, "❌ Nenhum vídeo enviado!" return input_path = "input.mp4" output_path = "output_reencoded.mp4" shutil.copy(video_file, input_path) # ── Detecta dimensão / orientação ─────────────────────────── probe = subprocess.run([ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "json", input_path ], capture_output=True, text=True) probe_data = json.loads(probe.stdout) stream = probe_data.get("streams", [{}])[0] vid_w = int(stream.get("width", 1920)) vid_h = int(stream.get("height", 1080)) is_vertical = vid_h > vid_w # ── Duração ───────────────────────────────────────────────── probe_dur = subprocess.run([ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "json", input_path ], capture_output=True, text=True) dur_data = json.loads(probe_dur.stdout) total_duration = float(dur_data.get("format", {}).get("duration", 0)) # ── Detecta áudio ─────────────────────────────────────────── probe_audio = subprocess.run([ "ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=codec_type", "-of", "json", input_path ], capture_output=True, text=True) has_audio = len(json.loads(probe_audio.stdout).get("streams", [])) > 0 # ── GPU / logo ────────────────────────────────────────────── quer_gpu = "GPU" in modo use_gpu = quer_gpu and gpu_disponivel() gpu_fallback = quer_gpu and not use_gpu tem_logo = logo_file is not None full_gpu_pipeline = ( use_gpu and not remover_duplicados and resolucao == "Original" and not tem_logo ) # ── Prepara a logo (Pillow) ───────────────────────────────── logo_tmp = None logo_width_px = 0 if tem_logo: try: logo_tmp = preparar_logo(logo_file, logo_opacidade) logo_width_px = max(1, int(vid_w * logo_tamanho / 100)) except Exception as e: yield None, f"❌ Erro ao processar logo: {e}" return # ── Filtros de vídeo base ─────────────────────────────────── vf_parts = [] if resolucao != "Original": m = re.search(r'(\d+)x(\d+)', resolucao) tw, th = m.group(1), m.group(2) if is_vertical: tw, th = th, tw vf_parts.append( f"scale={tw}:{th}:force_original_aspect_ratio=increase," f"crop={tw}:{th}" ) if not full_gpu_pipeline: vf_parts.append(f"fps={fps}") if remover_duplicados: vf_parts.append("mpdecimate=hi=1") # ── Monta comando FFmpeg ──────────────────────────────────── cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-progress", "pipe:1", "-y"] if full_gpu_pipeline: cmd += ["-hwaccel", "cuda", "-hwaccel_output_format", "cuda"] cmd += ["-i", input_path] if tem_logo: cmd += ["-i", logo_tmp] # ── filter_complex ────────────────────────────────────────── # O overlay do FFmpeg precisa de format=auto para lidar # corretamente com o canal alpha do PNG. if tem_logo: overlay_pos = calcular_posicao_logo( logo_posicao, logo_margem, logo_offset_x, logo_offset_y ) if vf_parts: fc = ( f"[0:v]{','.join(vf_parts)}[base];" f"[1:v]scale={logo_width_px}:-1[logo];" f"[base][logo]overlay={overlay_pos}:format=auto[vout]" ) else: fc = ( f"[1:v]scale={logo_width_px}:-1[logo];" f"[0:v][logo]overlay={overlay_pos}:format=auto[vout]" ) cmd += ["-filter_complex", fc, "-map", "[vout]"] if has_audio: cmd += ["-map", "0:a"] else: if vf_parts: cmd += ["-vf", ",".join(vf_parts)] if full_gpu_pipeline: cmd += ["-r", str(fps)] elif remover_duplicados and not tem_logo: cmd += ["-fps_mode", "vfr"] # ── Codec de vídeo ────────────────────────────────────────── if "x264" in modo: if use_gpu: cmd += ["-c:v", "h264_nvenc", "-preset", "p7", "-tune", "hq", "-rc", "vbr", "-cq", str(crf_valor), "-b:v", "0", "-profile:v", "high", "-pix_fmt", "yuv420p"] else: cmd += ["-c:v", "libx264", "-profile:v", "high", "-preset", "slow", "-crf", str(crf_valor), "-pix_fmt", "yuv420p"] else: # x265 if use_gpu: cmd += ["-c:v", "hevc_nvenc", "-preset", "p7", "-tune", "uhq", "-rc", "vbr", "-cq", str(crf_valor), "-b:v", "0", "-profile:v", "main", "-pix_fmt", "yuv420p"] else: cmd += ["-c:v", "libx265", "-preset", "slow", "-crf", str(crf_valor), "-pix_fmt", "yuv420p", "-x265-params", "sao=0:rd=6:psy-rd=1.0:psy-rdoq=2.0:rskip=1"] # ── Áudio (loudnorm + codec escolhido) ────────────────────── af = None if has_audio and normalizar_audio: yield None, "⏳ Analisando volume do áudio (1ª passada de loudnorm)..." result_ln = subprocess.run([ "ffmpeg", "-hide_banner", "-loglevel", "info", "-y", "-i", input_path, "-af", "loudnorm=print_format=json", "-f", "null", "-" ], capture_output=True, text=True) raw = result_ln.stderr start = raw.find("{") end = raw.rfind("}") + 1 if start != -1 and end != 0: try: stats = json.loads(raw[start:end]) af = ( f"loudnorm=I=-23:TP=-2:LRA=7:linear=true" f":measured_I={stats['input_i']}" f":measured_tp={stats['input_tp']}" f":measured_LRA={stats['input_lra']}" f":measured_thresh={stats['input_thresh']}" f":offset={stats['target_offset']}" ) except (json.JSONDecodeError, KeyError): normalizar_audio = False else: normalizar_audio = False if has_audio: if normalizar_audio and af: cmd += ["-af", af] cmd += montar_args_audio( audio_codec_label, audio_bitrate, audio_sample_rate, int(audio_canais) ) else: cmd += ["-an"] # ── Metadados e saída ─────────────────────────────────────── cmd += ["-movflags", "+faststart"] if has_audio: cmd += [ "-metadata:s:a:0", "language=por", "-metadata:s:a:0", "comment=Re-encoded by Super Re-Encoder", ] cmd.append(output_path) # ── Executa com progresso em tempo real ───────────────────── yield None, "⏳ Iniciando codificação..." process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) current_sec = 0.0 while True: line = process.stdout.readline() if not line and process.poll() is not None: break if line: line = line.strip() if line.startswith("out_time_ms="): val = line.split('=')[1] if val != 'N/A': current_sec = int(val) / 1000000 if total_duration > 0: pct = min(100, int(current_sec / total_duration * 100)) yield None, ( f"⏳ Codificando... {pct}% " f"({current_sec:.1f}s de {total_duration:.1f}s)" ) else: yield None, f"⏳ Codificando... {current_sec:.1f}s processados" if process.returncode != 0: err = process.stderr.read() yield None, f"❌ Erro no FFmpeg:\n{err[-1500:]}" return # ── Relatório final ───────────────────────────────────────── orig_mb = os.path.getsize(input_path) / (1024 * 1024) final_mb = os.path.getsize(output_path) / (1024 * 1024) reducao = round((1 - final_mb / orig_mb) * 100, 1) extras = [] if full_gpu_pipeline: extras.append("pipeline GPU completo (decode+encode na VRAM)") elif use_gpu: extras.append("encode NVENC / decode+filtros na CPU") if has_audio and normalizar_audio: extras.append("áudio normalizado (loudnorm -23 LUFS)") elif not has_audio: extras.append("⚠️ sem faixa de áudio") if remover_duplicados: frames_in = contar_frames(input_path) frames_out = contar_frames(output_path) if frames_in and frames_out: removidos = frames_in - frames_out pct_rem = round(removidos / frames_in * 100, 1) extras.append(f"mpdecimate: {removidos} frames removidos ({pct_rem}%)") else: extras.append("mpdecimate ativo") if tem_logo: extras.append( f"logo: {logo_posicao.lower()} | " f"tam {logo_tamanho}% | opacidade {logo_opacidade}%" ) if gpu_fallback: extras.append("⚠️ GPU indisponível → caiu para CPU") codec_v = ( "h264_nvenc" if "x264" in modo and use_gpu else "hevc_nvenc" if use_gpu else "libx264" if "x264" in modo else "libx265" ) yield output_path, ( f"✅ Concluído!\n" f"Original : {orig_mb:.1f} MB\n" f"Final : {final_mb:.1f} MB\n" f"Redução : {reducao}%\n" f"FPS : {fps}\n" f"CRF/CQ : {crf_valor}\n" f"Vídeo : {codec_v}\n" f"Áudio : {audio_codec_label} • {audio_bitrate} • " f"{audio_sample_rate} Hz • {audio_canais} canal(is)\n" f"Extras : {' | '.join(extras) if extras else 'nenhum'}" ) # ══════════════════════════════════════════════════════════════════ # Interface Gradio — reorganizada em ordem de fluxo # ══════════════════════════════════════════════════════════════════ CUSTOM_CSS = """ #app-header { text-align: center; margin: 6px 0 14px 0; } #app-header h1 { font-size: 1.9rem; margin: 0; letter-spacing: .5px; } #app-header p { color: #8b8b8b; margin: 4px 0 0 0; font-size: 0.95rem; } .gr-accordion { border-radius: 12px !important; } footer { visibility: hidden; } """ with gr.Blocks(title="Super Re-Encoder", theme=gr.themes.Soft(), css=CUSTOM_CSS) as demo: # ── Cabeçalho ─────────────────────────────────────────────── gr.HTML( """
Recodifica vídeos com alta qualidade e tamanho mínimo — com logo opcional.