#!/usr/bin/env python3 """ Modern UI for Video Background Replacer (PRO) - File-based progress polling to keep session alive - Real-time progress updates during long-running operations """ import streamlit as st import os from pathlib import Path from PIL import Image import numpy as np import logging import time import threading from utils.progress_tracker import get_progress logger = logging.getLogger("Advanced Video Background Replacer") UI_BUILD = "ui-2025-10-04-17-00Z" def tail_file(path: str, lines: int = 400) -> str: if not os.path.exists(path): return "(log file not found)" try: with open(path, "r", encoding="utf-8", errors="replace") as f: content = f.readlines() return "".join(content[-lines:]) except Exception as e: return f"(failed to read log: {e})" def read_file_bytes(path: str) -> bytes: try: if not os.path.exists(path): return b"" with open(path, "rb") as f: return f.read() except Exception: return b"" def _render_background_settings(): stock_images = { "Sunset Beach": "stock_images/sunset_beach.jpg", "Urban Office": "stock_images/urban_office.jpg", "Studio Lighting": "stock_images/studio_light.jpg", } st.header("2. Background Settings") bg_type = st.radio( "Select Background Type:", ["Image", "Color", "Stock", "AI Prompt"], horizontal=True, key="bg_type_radio" ) background = None if bg_type == "Image": bg_image = st.file_uploader( "Upload Background Image", type=["jpg", "png", "jpeg"], key="bg_image_uploader" ) if bg_image is not None: bg_image.seek(0) background = Image.open(bg_image).convert("RGB") st.image(background, caption="Selected Background", use_column_width=True) elif bg_type == "Color": selected_color = st.color_picker( "Choose Background Color", st.session_state.get('bg_color', "#00FF00"), key="color_picker" ) background = selected_color color_preview = np.full( (100, 100, 3), tuple(int(selected_color.lstrip('#')[i:i+2], 16) for i in (0, 2, 4)), dtype=np.uint8 ) st.image(color_preview, caption="Selected Color", width=200) elif bg_type == "Stock": stock_choice = st.selectbox( "Choose a professional stock background:", list(stock_images.keys()), key="stock_image_select" ) stock_img_path = stock_images[stock_choice] try: background = Image.open(stock_img_path).convert("RGB") st.image(background, caption=stock_choice, use_column_width=True) except FileNotFoundError: st.warning(f"Stock image not found: {stock_img_path}. Upload your own image or choose another.") background = None elif bg_type == "AI Prompt": prompt = st.text_input("Describe the background to generate (AI):", key="ai_bg_prompt") ai_ready = False if st.button("Generate Background", key="gen_bg_btn") and prompt: background = Image.new("RGB", (512, 320), (64, 32, 96)) st.session_state.generated_bg = background st.success("AI-generated background (stub). Replace with your generator!") ai_ready = True elif "generated_bg" in st.session_state: background = st.session_state.generated_bg ai_ready = True if ai_ready and background is not None: st.image(background, caption="Generated Background", use_column_width=True) return background, bg_type def render_ui(process_video_func): try: with st.sidebar: st.subheader("System Status") st.caption(f"UI build: {UI_BUILD}") st.markdown("**Log file:** `/tmp/app.log`") if st.session_state.get('gpu_available', False): try: import torch dev = torch.cuda.get_device_name(0) except Exception: dev = "Detected (name unavailable)" st.success(f"GPU: {dev}") else: st.error("GPU: Not Available") st.number_input("Tail last N lines", min_value=50, max_value=5000, step=50, key="log_tail_lines") log_bytes = read_file_bytes("/tmp/app.log") st.download_button( "Download Logs", data=log_bytes if log_bytes else b"Log file not available yet.", file_name="app.log", mime="text/plain", use_container_width=True, disabled=not bool(log_bytes) ) with st.expander("View Log Tail", expanded=True): if st.button("Refresh log", use_container_width=True, key="refresh_log_btn"): st.session_state['_last_log_refresh'] = time.time() log_text = tail_file("/tmp/app.log", st.session_state.get('log_tail_lines', 400)) st.code(log_text, language="text") col1, col2 = st.columns([1, 1], gap="large") with col1: st.header("1. Upload Video") uploaded_video = st.file_uploader( "Upload Video", type=["mp4", "mov", "avi", "mkv", "webm"], key="video_uploader" ) st.markdown("### Video Preview") video_preview_placeholder = st.empty() if uploaded_video is not None: try: uploaded_video.seek(0) video_bytes = uploaded_video.read() st.session_state.video_bytes_cache = video_bytes with video_preview_placeholder.container(): st.video(video_bytes) except Exception as e: logger.error(f"[UI] Video preview error: {e}", exc_info=True) video_preview_placeholder.error(f"Cannot display video: {e}") else: video_preview_placeholder.empty() with col2: background, bg_type = _render_background_settings() st.header("3. Process Video") progress_container = st.container() with progress_container: progress_bar = st.progress(0) status_text = st.empty() stage_status = st.empty() can_process = ( st.session_state.get('video_bytes_cache') is not None and not st.session_state.get('processing', False) and (background is not None) ) if st.button("Process Video", disabled=not can_process, use_container_width=True): try: logger.info("Process Video button clicked") import io class _MemFile: def __init__(self, name, data): self.name = name self._b = io.BytesIO(data) def read(self): self._b.seek(0) return self._b.read() def seek(self, pos): self._b.seek(pos) st.session_state.processing = True mem_video = _MemFile("uploaded.mp4", st.session_state.video_bytes_cache) # Start processing in background thread thread = threading.Thread( target=process_video_func, args=(mem_video, background, bg_type.lower()), daemon=True ) thread.start() # Poll for progress updates while thread.is_alive() or st.session_state.get('processing', False): status = get_progress() if status.get('active') or not status.get('complete'): # Update UI progress_bar.progress(status.get('progress', 0)) status_text.info(f"**Status:** {status.get('message', 'Processing...')}") stage_status.markdown(f"**Current Stage:** {status.get('stage', 'Unknown')}") # Check for errors if status.get('error'): status_text.error(f"**Error:** {status['error']}") break # Check if complete if status.get('complete'): break time.sleep(1) # Poll every second # Wait for thread to finish thread.join(timeout=5) # Final status check final_status = get_progress() if final_status.get('error'): progress_bar.progress(final_status.get('progress', 0)) status_text.error(f"**Status:** Processing failed - {final_status['error']}") st.error(f"Processing failed: {final_status['error']}") elif st.session_state.get('processed_video_bytes'): progress_bar.progress(100) status_text.success("**Status:** Processing complete!") st.success("Video processing complete!") else: status_text.error("**Status:** Processing failed. Check logs.") st.error("Processing failed. Check logs for details.") except Exception as e: st.session_state.processing = False logger.error(f"[UI] Process video error: {e}", exc_info=True) status_text.error(f"**Status:** Error - {str(e)}") st.error(f"Processing error: {str(e)}. Check logs for details.") if st.session_state.get('processed_video_bytes') is not None: st.markdown("---") st.markdown("### Processed Video") try: st.video(st.session_state.processed_video_bytes) st.download_button( label="Download Processed Video", data=st.session_state.processed_video_bytes, file_name="processed_video.mp4", mime="video/mp4", use_container_width=True ) except Exception as e: logger.error(f"[UI] Display error: {e}", exc_info=True) st.error(f"Display error: {e}") if st.session_state.get('last_error'): with st.expander("Last Error", expanded=True): st.error(st.session_state.last_error) if st.button("Clear Error"): st.session_state.last_error = None except Exception as e: logger.error(f"[UI] Render UI error: {e}", exc_info=True) st.error(f"UI rendering error: {str(e)}. Check logs for details.")