import os import streamlit as st import requests import cv2 import numpy as np from pathlib import Path import subprocess import shutil # Gradio Space URL url = 'https://your-gradio-space-url/api/endpoint' # Replace with your actual URL # Load the write token from an environment variable write_token = os.getenv('WF') # Function to make a request def make_request(data): headers = { 'Authorization': f'Bearer {write_token}', # Add any other headers required by the API } response = requests.post(url, json=data, headers=headers) return response.json() # --- Uploads --- video_file = st.file_uploader("Upload video", type=["mp4", "mov"]) logo_file = st.file_uploader("Upload logo PNG", type=["png"]) logo_size_option = st.selectbox("Logo size", [64, 128, 256]) # --- Shader toggle --- shader_option = st.radio("Apply Shader?", ["Yes", "No"]) shader_code = """ def shader_frame(width, height, t): x = np.linspace(0, 1, width) y = np.linspace(0, 1, height) xv, yv = np.meshgrid(x, y) r = ((xv + 0.5*np.sin(t)) % 1.0 * 255).astype(np.uint8) g = ((yv + 0.5*np.cos(t)) % 1.0 * 255).astype(np.uint8) b = (0.5*255*np.ones_like(r)).astype(np.uint8) frame = np.stack([b, g, r], axis=2) return frame """ if shader_option == "Yes": st.text_area("Current shader code", shader_code, height=200) # --- Paths --- video_path = Path("video.mp4") if video_file: video_path.write_bytes(video_file.getbuffer()) status_placeholder = st.empty() # --- Shader generator --- def shader_frame(width, height, t): x = np.linspace(0, 1, width) y = np.linspace(0, 1, height) xv, yv = np.meshgrid(x, y) r = ((xv + 0.5*np.sin(t)) % 1.0 * 255).astype(np.uint8) g = ((yv + 0.5*np.cos(t)) % 1.0 * 255).astype(np.uint8) b = (0.5*255*np.ones_like(r)).astype(np.uint8) frame = np.stack([b, g, r], axis=2) return frame # --- Logo loader --- logo = None has_alpha = False if logo_file: logo_data = np.frombuffer(logo_file.read(), np.uint8) logo = cv2.imdecode(logo_data, cv2.IMREAD_UNCHANGED) has_alpha = logo.shape[2] == 4 if logo.ndim == 3 else False st.text(f"Logo loaded, transparency: {'Yes' if has_alpha else 'No'}") # Resize logo lh, lw = logo.shape[:2] scale = logo_size_option / max(lw, lh) new_w = int(lw * scale) new_h = int(lh * scale) logo = cv2.resize(logo, (new_w, new_h), interpolation=cv2.INTER_AREA) # --- Logo overlay --- def overlay_logo(frame, logo, has_alpha): height, width = frame.shape[:2] lh, lw = logo.shape[:2] y1, y2 = height - lh - 10, height - 10 x1, x2 = width - lw - 10, width - 10 if has_alpha: alpha = logo[:, :, 3] / 255.0 for c in range(3): frame[y1:y2, x1:x2, c] = ( alpha * logo[:, :, c] + (1 - alpha) * frame[y1:y2, x1:x2, c] ) else: frame[y1:y2, x1:x2] = logo[:, :, :3] return frame # --- GPU detection --- def has_nvenc(): try: result = subprocess.run(["ffmpeg", "-encoders"], capture_output=True, text=True) return "h264_nvenc" in result.stdout except Exception: return False # --- Preview generator --- def generate_preview(): if not video_file: status_placeholder.warning("Upload a video first!") return cap = cv2.VideoCapture(str(video_path)) fps = cap.get(cv2.CAP_PROP_FPS) or 30 width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) st.text(f"Preview resolution: {width} x {height} @ {fps:.1f} FPS") tmp_out = Path("temp_frames.mp4") out = cv2.VideoWriter(str(tmp_out), cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height)) t = 0.0 dt = 1.0 / fps frame_idx = 0 preview_display = st.empty() progress_bar = st.progress(0) while True: ret, frame = cap.read() if not ret: break # Shader if shader_option == "Yes": shader = shader_frame(width, height, t) blended = cv2.addWeighted(frame, 0.7, shader, 0.3, 0) else: blended = frame.copy() # Logo if logo is not None: blended = overlay_logo(blended, logo, has_alpha) out.write(blended) # Live preview preview_small = cv2.resize(blended, (width // 2, height // 2)) preview_display.image(preview_small, channels="BGR") t += dt frame_idx += 1 progress_bar.progress(min(frame_idx / max(total_frames, 1), 1.0)) cap.release() out.release() # --- Merge audio --- output_path = Path("preview.mp4") encoder = "h264_nvenc" if has_nvenc() else "libx264" st.info(f"Encoding with: {encoder}") cmd = [ "ffmpeg", "-y", "-i", str(tmp_out), "-i", str(video_path), "-map", "0:v:0", "-map", "1:a:0", "-c:v", encoder, "-preset", "fast", "-b:v", "5M", "-c:a", "aac", str(output_path) ] try: subprocess.run(cmd, check=True) st.success("✅ Preview generated successfully with audio!") st.video(str(output_path)) st.markdown(f"[📥 Download preview](./{output_path})") except subprocess.CalledProcessError as e: st.error("❌ FFmpeg failed to merge audio.") st.text(e) # --- Buttons --- if st.button("Generate Preview"): generate_preview() if st.button("Go Live"): if not video_file: st.warning("Upload a video first!") else: st.info("Starting live stream...") # Start live stream using FFmpeg live_stream_url = "rtmp://your-rtmp-server/live/stream" live_stream_cmd = [ "ffmpeg", "-re", "-i", str(video_path), "-c:v", "libx264", "-preset", "fast", "-b:v", "5M", "-c:a", "aac", "-f", "flv", live_stream_url ] try: subprocess.Popen(live_stream_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) st.success("✅ Live stream started successfully!") except subprocess.CalledProcessError as e: st.error("❌ Failed to start live stream.") st.text(e)