| import streamlit as st
|
| import cv2
|
| import numpy as np
|
| from pathlib import Path
|
| import subprocess
|
| import shutil
|
|
|
| st.title("🎬 Live Preview + Shader + Logo App (GPU/CPU + Audio)")
|
|
|
|
|
| video_file = st.file_uploader("Upload video", type=["mp4", "mov"])
|
| logo_file = st.file_uploader("Upload logo PNG", type=["png"])
|
| logo_size_option = st.selectbox("Logo size", [64, 128, 256])
|
|
|
|
|
| shader_option = st.radio("Apply Shader?", ["Yes", "No"])
|
| shader_code = """
|
| def shader_frame(width, height, t):
|
| x = np.linspace(0, 1, width)
|
| y = np.linspace(0, 1, height)
|
| xv, yv = np.meshgrid(x, y)
|
| r = ((xv + 0.5*np.sin(t)) % 1.0 * 255).astype(np.uint8)
|
| g = ((yv + 0.5*np.cos(t)) % 1.0 * 255).astype(np.uint8)
|
| b = (0.5*255*np.ones_like(r)).astype(np.uint8)
|
| frame = np.stack([b, g, r], axis=2)
|
| return frame
|
| """
|
| if shader_option == "Yes":
|
| st.text_area("Current shader code", shader_code, height=200)
|
|
|
|
|
| video_path = Path("video.mp4")
|
| if video_file:
|
| video_path.write_bytes(video_file.getbuffer())
|
|
|
| status_placeholder = st.empty()
|
|
|
|
|
| def shader_frame(width, height, t):
|
| x = np.linspace(0, 1, width)
|
| y = np.linspace(0, 1, height)
|
| xv, yv = np.meshgrid(x, y)
|
| r = ((xv + 0.5*np.sin(t)) % 1.0 * 255).astype(np.uint8)
|
| g = ((yv + 0.5*np.cos(t)) % 1.0 * 255).astype(np.uint8)
|
| b = (0.5*255*np.ones_like(r)).astype(np.uint8)
|
| frame = np.stack([b, g, r], axis=2)
|
| return frame
|
|
|
|
|
| logo = None
|
| has_alpha = False
|
| if logo_file:
|
| logo_data = np.frombuffer(logo_file.read(), np.uint8)
|
| logo = cv2.imdecode(logo_data, cv2.IMREAD_UNCHANGED)
|
| has_alpha = logo.shape[2] == 4 if logo.ndim == 3 else False
|
| st.text(f"Logo loaded, transparency: {'Yes' if has_alpha else 'No'}")
|
|
|
|
|
| lh, lw = logo.shape[:2]
|
| scale = logo_size_option / max(lw, lh)
|
| new_w = int(lw * scale)
|
| new_h = int(lh * scale)
|
| logo = cv2.resize(logo, (new_w, new_h), interpolation=cv2.INTER_AREA)
|
|
|
|
|
| def overlay_logo(frame, logo, has_alpha):
|
| height, width = frame.shape[:2]
|
| lh, lw = logo.shape[:2]
|
| y1, y2 = height - lh - 10, height - 10
|
| x1, x2 = width - lw - 10, width - 10
|
| if has_alpha:
|
| alpha = logo[:, :, 3] / 255.0
|
| for c in range(3):
|
| frame[y1:y2, x1:x2, c] = (
|
| alpha * logo[:, :, c] + (1 - alpha) * frame[y1:y2, x1:x2, c]
|
| )
|
| else:
|
| frame[y1:y2, x1:x2] = logo[:, :, :3]
|
| return frame
|
|
|
|
|
| def has_nvenc():
|
| try:
|
| result = subprocess.run(["ffmpeg", "-encoders"], capture_output=True, text=True)
|
| return "h264_nvenc" in result.stdout
|
| except Exception:
|
| return False
|
|
|
|
|
| def generate_preview():
|
| if not video_file:
|
| status_placeholder.warning("Upload a video first!")
|
| return
|
|
|
| cap = cv2.VideoCapture(str(video_path))
|
| fps = cap.get(cv2.CAP_PROP_FPS) or 30
|
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
|
| st.text(f"Preview resolution: {width} x {height} @ {fps:.1f} FPS")
|
|
|
| tmp_out = Path("temp_frames.mp4")
|
| out = cv2.VideoWriter(str(tmp_out),
|
| cv2.VideoWriter_fourcc(*'mp4v'),
|
| fps, (width, height))
|
|
|
| t = 0.0
|
| dt = 1.0 / fps
|
| frame_idx = 0
|
|
|
| preview_display = st.empty()
|
| progress_bar = st.progress(0)
|
|
|
| while True:
|
| ret, frame = cap.read()
|
| if not ret:
|
| break
|
|
|
|
|
| if shader_option == "Yes":
|
| shader = shader_frame(width, height, t)
|
| blended = cv2.addWeighted(frame, 0.7, shader, 0.3, 0)
|
| else:
|
| blended = frame.copy()
|
|
|
|
|
| if logo is not None:
|
| blended = overlay_logo(blended, logo, has_alpha)
|
|
|
| out.write(blended)
|
|
|
|
|
| preview_small = cv2.resize(blended, (width // 2, height // 2))
|
| preview_display.image(preview_small, channels="BGR")
|
|
|
| t += dt
|
| frame_idx += 1
|
| progress_bar.progress(min(frame_idx / max(total_frames, 1), 1.0))
|
|
|
| cap.release()
|
| out.release()
|
|
|
|
|
| output_path = Path("preview.mp4")
|
| encoder = "h264_nvenc" if has_nvenc() else "libx264"
|
| st.info(f"Encoding with: {encoder}")
|
|
|
| cmd = [
|
| "ffmpeg", "-y",
|
| "-i", str(tmp_out),
|
| "-i", str(video_path),
|
| "-map", "0:v:0",
|
| "-map", "1:a:0",
|
| "-c:v", encoder,
|
| "-preset", "fast",
|
| "-b:v", "5M",
|
| "-c:a", "aac",
|
| str(output_path)
|
| ]
|
|
|
| try:
|
| subprocess.run(cmd, check=True)
|
| st.success("✅ Preview generated successfully with audio!")
|
| st.video(str(output_path))
|
| st.markdown(f"[📥 Download preview](./{output_path})")
|
| except subprocess.CalledProcessError as e:
|
| st.error("❌ FFmpeg failed to merge audio.")
|
| st.text(e)
|
|
|
|
|
| if st.button("Generate Preview"):
|
| generate_preview()
|
|
|
| if st.button("Go Live"):
|
| st.warning("Go Live is not implemented yet.")
|
|
|