| | |
| | """ |
| | Modern UI for Video Background Replacer (PRO) |
| | - File-based progress polling to keep session alive |
| | - Real-time progress updates during long-running operations |
| | """ |
| | import streamlit as st |
| | import os |
| | from pathlib import Path |
| | from PIL import Image |
| | import numpy as np |
| | import logging |
| | import time |
| | import threading |
| |
|
| | from utils.progress_tracker import get_progress |
| |
|
| | logger = logging.getLogger("Advanced Video Background Replacer") |
| | UI_BUILD = "ui-2025-10-04-17-00Z" |
| |
|
| | def tail_file(path: str, lines: int = 400) -> str: |
| | if not os.path.exists(path): |
| | return "(log file not found)" |
| | try: |
| | with open(path, "r", encoding="utf-8", errors="replace") as f: |
| | content = f.readlines() |
| | return "".join(content[-lines:]) |
| | except Exception as e: |
| | return f"(failed to read log: {e})" |
| |
|
| | def read_file_bytes(path: str) -> bytes: |
| | try: |
| | if not os.path.exists(path): |
| | return b"" |
| | with open(path, "rb") as f: |
| | return f.read() |
| | except Exception: |
| | return b"" |
| |
|
| | def _render_background_settings(): |
| | stock_images = { |
| | "Sunset Beach": "stock_images/sunset_beach.jpg", |
| | "Urban Office": "stock_images/urban_office.jpg", |
| | "Studio Lighting": "stock_images/studio_light.jpg", |
| | } |
| | st.header("2. Background Settings") |
| | bg_type = st.radio( |
| | "Select Background Type:", |
| | ["Image", "Color", "Stock", "AI Prompt"], |
| | horizontal=True, |
| | key="bg_type_radio" |
| | ) |
| | background = None |
| |
|
| | if bg_type == "Image": |
| | bg_image = st.file_uploader( |
| | "Upload Background Image", |
| | type=["jpg", "png", "jpeg"], |
| | key="bg_image_uploader" |
| | ) |
| | if bg_image is not None: |
| | bg_image.seek(0) |
| | background = Image.open(bg_image).convert("RGB") |
| | st.image(background, caption="Selected Background", use_column_width=True) |
| |
|
| | elif bg_type == "Color": |
| | selected_color = st.color_picker( |
| | "Choose Background Color", |
| | st.session_state.get('bg_color', "#00FF00"), |
| | key="color_picker" |
| | ) |
| | background = selected_color |
| | color_preview = np.full( |
| | (100, 100, 3), |
| | tuple(int(selected_color.lstrip('#')[i:i+2], 16) for i in (0, 2, 4)), |
| | dtype=np.uint8 |
| | ) |
| | st.image(color_preview, caption="Selected Color", width=200) |
| |
|
| | elif bg_type == "Stock": |
| | stock_choice = st.selectbox( |
| | "Choose a professional stock background:", |
| | list(stock_images.keys()), |
| | key="stock_image_select" |
| | ) |
| | stock_img_path = stock_images[stock_choice] |
| | try: |
| | background = Image.open(stock_img_path).convert("RGB") |
| | st.image(background, caption=stock_choice, use_column_width=True) |
| | except FileNotFoundError: |
| | st.warning(f"Stock image not found: {stock_img_path}. Upload your own image or choose another.") |
| | background = None |
| |
|
| | elif bg_type == "AI Prompt": |
| | prompt = st.text_input("Describe the background to generate (AI):", key="ai_bg_prompt") |
| | ai_ready = False |
| | if st.button("Generate Background", key="gen_bg_btn") and prompt: |
| | background = Image.new("RGB", (512, 320), (64, 32, 96)) |
| | st.session_state.generated_bg = background |
| | st.success("AI-generated background (stub). Replace with your generator!") |
| | ai_ready = True |
| | elif "generated_bg" in st.session_state: |
| | background = st.session_state.generated_bg |
| | ai_ready = True |
| | if ai_ready and background is not None: |
| | st.image(background, caption="Generated Background", use_column_width=True) |
| |
|
| | return background, bg_type |
| |
|
| | def render_ui(process_video_func): |
| | try: |
| | with st.sidebar: |
| | st.subheader("System Status") |
| | st.caption(f"UI build: {UI_BUILD}") |
| | st.markdown("**Log file:** `/tmp/app.log`") |
| |
|
| | if st.session_state.get('gpu_available', False): |
| | try: |
| | import torch |
| | dev = torch.cuda.get_device_name(0) |
| | except Exception: |
| | dev = "Detected (name unavailable)" |
| | st.success(f"GPU: {dev}") |
| | else: |
| | st.error("GPU: Not Available") |
| |
|
| | st.number_input("Tail last N lines", min_value=50, max_value=5000, step=50, key="log_tail_lines") |
| | log_bytes = read_file_bytes("/tmp/app.log") |
| | st.download_button( |
| | "Download Logs", |
| | data=log_bytes if log_bytes else b"Log file not available yet.", |
| | file_name="app.log", |
| | mime="text/plain", |
| | use_container_width=True, |
| | disabled=not bool(log_bytes) |
| | ) |
| | with st.expander("View Log Tail", expanded=True): |
| | if st.button("Refresh log", use_container_width=True, key="refresh_log_btn"): |
| | st.session_state['_last_log_refresh'] = time.time() |
| | log_text = tail_file("/tmp/app.log", st.session_state.get('log_tail_lines', 400)) |
| | st.code(log_text, language="text") |
| |
|
| | col1, col2 = st.columns([1, 1], gap="large") |
| |
|
| | with col1: |
| | st.header("1. Upload Video") |
| | uploaded_video = st.file_uploader( |
| | "Upload Video", |
| | type=["mp4", "mov", "avi", "mkv", "webm"], |
| | key="video_uploader" |
| | ) |
| | st.markdown("### Video Preview") |
| | video_preview_placeholder = st.empty() |
| | if uploaded_video is not None: |
| | try: |
| | uploaded_video.seek(0) |
| | video_bytes = uploaded_video.read() |
| | st.session_state.video_bytes_cache = video_bytes |
| | with video_preview_placeholder.container(): |
| | st.video(video_bytes) |
| | except Exception as e: |
| | logger.error(f"[UI] Video preview error: {e}", exc_info=True) |
| | video_preview_placeholder.error(f"Cannot display video: {e}") |
| | else: |
| | video_preview_placeholder.empty() |
| |
|
| | with col2: |
| | background, bg_type = _render_background_settings() |
| | st.header("3. Process Video") |
| | |
| | progress_container = st.container() |
| | with progress_container: |
| | progress_bar = st.progress(0) |
| | status_text = st.empty() |
| | stage_status = st.empty() |
| | |
| | can_process = ( |
| | st.session_state.get('video_bytes_cache') is not None |
| | and not st.session_state.get('processing', False) |
| | and (background is not None) |
| | ) |
| |
|
| | if st.button("Process Video", disabled=not can_process, use_container_width=True): |
| | try: |
| | logger.info("Process Video button clicked") |
| | |
| | import io |
| | class _MemFile: |
| | def __init__(self, name, data): |
| | self.name = name |
| | self._b = io.BytesIO(data) |
| | def read(self): |
| | self._b.seek(0) |
| | return self._b.read() |
| | def seek(self, pos): |
| | self._b.seek(pos) |
| |
|
| | st.session_state.processing = True |
| | mem_video = _MemFile("uploaded.mp4", st.session_state.video_bytes_cache) |
| |
|
| | |
| | thread = threading.Thread( |
| | target=process_video_func, |
| | args=(mem_video, background, bg_type.lower()), |
| | daemon=True |
| | ) |
| | thread.start() |
| |
|
| | |
| | while thread.is_alive() or st.session_state.get('processing', False): |
| | status = get_progress() |
| | |
| | if status.get('active') or not status.get('complete'): |
| | |
| | progress_bar.progress(status.get('progress', 0)) |
| | status_text.info(f"**Status:** {status.get('message', 'Processing...')}") |
| | stage_status.markdown(f"**Current Stage:** {status.get('stage', 'Unknown')}") |
| | |
| | |
| | if status.get('error'): |
| | status_text.error(f"**Error:** {status['error']}") |
| | break |
| | |
| | |
| | if status.get('complete'): |
| | break |
| | |
| | time.sleep(1) |
| | |
| | |
| | thread.join(timeout=5) |
| | |
| | |
| | final_status = get_progress() |
| | if final_status.get('error'): |
| | progress_bar.progress(final_status.get('progress', 0)) |
| | status_text.error(f"**Status:** Processing failed - {final_status['error']}") |
| | st.error(f"Processing failed: {final_status['error']}") |
| | elif st.session_state.get('processed_video_bytes'): |
| | progress_bar.progress(100) |
| | status_text.success("**Status:** Processing complete!") |
| | st.success("Video processing complete!") |
| | else: |
| | status_text.error("**Status:** Processing failed. Check logs.") |
| | st.error("Processing failed. Check logs for details.") |
| | |
| | except Exception as e: |
| | st.session_state.processing = False |
| | logger.error(f"[UI] Process video error: {e}", exc_info=True) |
| | status_text.error(f"**Status:** Error - {str(e)}") |
| | st.error(f"Processing error: {str(e)}. Check logs for details.") |
| |
|
| | if st.session_state.get('processed_video_bytes') is not None: |
| | st.markdown("---") |
| | st.markdown("### Processed Video") |
| | try: |
| | st.video(st.session_state.processed_video_bytes) |
| | st.download_button( |
| | label="Download Processed Video", |
| | data=st.session_state.processed_video_bytes, |
| | file_name="processed_video.mp4", |
| | mime="video/mp4", |
| | use_container_width=True |
| | ) |
| | except Exception as e: |
| | logger.error(f"[UI] Display error: {e}", exc_info=True) |
| | st.error(f"Display error: {e}") |
| |
|
| | if st.session_state.get('last_error'): |
| | with st.expander("Last Error", expanded=True): |
| | st.error(st.session_state.last_error) |
| | if st.button("Clear Error"): |
| | st.session_state.last_error = None |
| | |
| | except Exception as e: |
| | logger.error(f"[UI] Render UI error: {e}", exc_info=True) |
| | st.error(f"UI rendering error: {str(e)}. Check logs for details.") |