Spaces:
Sleeping
Sleeping
| import os | |
| import streamlit as st | |
| import requests | |
| import cv2 | |
| import numpy as np | |
| from pathlib import Path | |
| import subprocess | |
| import shutil | |
| # Gradio Space URL | |
| url = 'https://your-gradio-space-url/api/endpoint' # Replace with your actual URL | |
| # Load the write token from an environment variable | |
| write_token = os.getenv('WF') | |
| # Function to make a request | |
| def make_request(data): | |
| headers = { | |
| 'Authorization': f'Bearer {write_token}', | |
| # Add any other headers required by the API | |
| } | |
| response = requests.post(url, json=data, headers=headers) | |
| return response.json() | |
| # --- Uploads --- | |
| video_file = st.file_uploader("Upload video", type=["mp4", "mov"]) | |
| logo_file = st.file_uploader("Upload logo PNG", type=["png"]) | |
| logo_size_option = st.selectbox("Logo size", [64, 128, 256]) | |
| # --- Shader toggle --- | |
| shader_option = st.radio("Apply Shader?", ["Yes", "No"]) | |
| shader_code = """ | |
| def shader_frame(width, height, t): | |
| x = np.linspace(0, 1, width) | |
| y = np.linspace(0, 1, height) | |
| xv, yv = np.meshgrid(x, y) | |
| r = ((xv + 0.5*np.sin(t)) % 1.0 * 255).astype(np.uint8) | |
| g = ((yv + 0.5*np.cos(t)) % 1.0 * 255).astype(np.uint8) | |
| b = (0.5*255*np.ones_like(r)).astype(np.uint8) | |
| frame = np.stack([b, g, r], axis=2) | |
| return frame | |
| """ | |
| if shader_option == "Yes": | |
| st.text_area("Current shader code", shader_code, height=200) | |
| # --- Paths --- | |
| video_path = Path("video.mp4") | |
| if video_file: | |
| video_path.write_bytes(video_file.getbuffer()) | |
| status_placeholder = st.empty() | |
| # --- Shader generator --- | |
| def shader_frame(width, height, t): | |
| x = np.linspace(0, 1, width) | |
| y = np.linspace(0, 1, height) | |
| xv, yv = np.meshgrid(x, y) | |
| r = ((xv + 0.5*np.sin(t)) % 1.0 * 255).astype(np.uint8) | |
| g = ((yv + 0.5*np.cos(t)) % 1.0 * 255).astype(np.uint8) | |
| b = (0.5*255*np.ones_like(r)).astype(np.uint8) | |
| frame = np.stack([b, g, r], axis=2) | |
| return frame | |
| # --- Logo loader --- | |
| logo = None | |
| has_alpha = False | |
| if logo_file: | |
| logo_data = np.frombuffer(logo_file.read(), np.uint8) | |
| logo = cv2.imdecode(logo_data, cv2.IMREAD_UNCHANGED) | |
| has_alpha = logo.shape[2] == 4 if logo.ndim == 3 else False | |
| st.text(f"Logo loaded, transparency: {'Yes' if has_alpha else 'No'}") | |
| # Resize logo | |
| lh, lw = logo.shape[:2] | |
| scale = logo_size_option / max(lw, lh) | |
| new_w = int(lw * scale) | |
| new_h = int(lh * scale) | |
| logo = cv2.resize(logo, (new_w, new_h), interpolation=cv2.INTER_AREA) | |
| # --- Logo overlay --- | |
| def overlay_logo(frame, logo, has_alpha): | |
| height, width = frame.shape[:2] | |
| lh, lw = logo.shape[:2] | |
| y1, y2 = height - lh - 10, height - 10 | |
| x1, x2 = width - lw - 10, width - 10 | |
| if has_alpha: | |
| alpha = logo[:, :, 3] / 255.0 | |
| for c in range(3): | |
| frame[y1:y2, x1:x2, c] = ( | |
| alpha * logo[:, :, c] + (1 - alpha) * frame[y1:y2, x1:x2, c] | |
| ) | |
| else: | |
| frame[y1:y2, x1:x2] = logo[:, :, :3] | |
| return frame | |
| # --- GPU detection --- | |
| def has_nvenc(): | |
| try: | |
| result = subprocess.run(["ffmpeg", "-encoders"], capture_output=True, text=True) | |
| return "h264_nvenc" in result.stdout | |
| except Exception: | |
| return False | |
| # --- Preview generator --- | |
| def generate_preview(): | |
| if not video_file: | |
| status_placeholder.warning("Upload a video first!") | |
| return | |
| cap = cv2.VideoCapture(str(video_path)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 30 | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| st.text(f"Preview resolution: {width} x {height} @ {fps:.1f} FPS") | |
| tmp_out = Path("temp_frames.mp4") | |
| out = cv2.VideoWriter(str(tmp_out), | |
| cv2.VideoWriter_fourcc(*'mp4v'), | |
| fps, (width, height)) | |
| t = 0.0 | |
| dt = 1.0 / fps | |
| frame_idx = 0 | |
| preview_display = st.empty() | |
| progress_bar = st.progress(0) | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Shader | |
| if shader_option == "Yes": | |
| shader = shader_frame(width, height, t) | |
| blended = cv2.addWeighted(frame, 0.7, shader, 0.3, 0) | |
| else: | |
| blended = frame.copy() | |
| # Logo | |
| if logo is not None: | |
| blended = overlay_logo(blended, logo, has_alpha) | |
| out.write(blended) | |
| # Live preview | |
| preview_small = cv2.resize(blended, (width // 2, height // 2)) | |
| preview_display.image(preview_small, channels="BGR") | |
| t += dt | |
| frame_idx += 1 | |
| progress_bar.progress(min(frame_idx / max(total_frames, 1), 1.0)) | |
| cap.release() | |
| out.release() | |
| # --- Merge audio --- | |
| output_path = Path("preview.mp4") | |
| encoder = "h264_nvenc" if has_nvenc() else "libx264" | |
| st.info(f"Encoding with: {encoder}") | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-i", str(tmp_out), | |
| "-i", str(video_path), | |
| "-map", "0:v:0", | |
| "-map", "1:a:0", | |
| "-c:v", encoder, | |
| "-preset", "fast", | |
| "-b:v", "5M", | |
| "-c:a", "aac", | |
| str(output_path) | |
| ] | |
| try: | |
| subprocess.run(cmd, check=True) | |
| st.success("✅ Preview generated successfully with audio!") | |
| st.video(str(output_path)) | |
| st.markdown(f"[📥 Download preview](./{output_path})") | |
| except subprocess.CalledProcessError as e: | |
| st.error("❌ FFmpeg failed to merge audio.") | |
| st.text(e) | |
| # --- Buttons --- | |
| if st.button("Generate Preview"): | |
| generate_preview() | |
| if st.button("Go Live"): | |
| if not video_file: | |
| st.warning("Upload a video first!") | |
| else: | |
| st.info("Starting live stream...") | |
| # Start live stream using FFmpeg | |
| live_stream_url = "rtmp://your-rtmp-server/live/stream" | |
| live_stream_cmd = [ | |
| "ffmpeg", "-re", | |
| "-i", str(video_path), | |
| "-c:v", "libx264", | |
| "-preset", "fast", | |
| "-b:v", "5M", | |
| "-c:a", "aac", | |
| "-f", "flv", | |
| live_stream_url | |
| ] | |
| try: | |
| subprocess.Popen(live_stream_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| st.success("✅ Live stream started successfully!") | |
| except subprocess.CalledProcessError as e: | |
| st.error("❌ Failed to start live stream.") | |
| st.text(e) |