|
|
|
|
|
""" |
|
|
Modern UI for Video Background Replacer (PRO) |
|
|
- File-based progress polling to keep session alive |
|
|
- Real-time progress updates during long-running operations |
|
|
""" |
|
|
import streamlit as st |
|
|
import os |
|
|
from pathlib import Path |
|
|
from PIL import Image |
|
|
import numpy as np |
|
|
import logging |
|
|
import time |
|
|
import threading |
|
|
|
|
|
from utils.progress_tracker import get_progress |
|
|
|
|
|
logger = logging.getLogger("Advanced Video Background Replacer") |
|
|
UI_BUILD = "ui-2025-10-04-17-00Z" |
|
|
|
|
|
def tail_file(path: str, lines: int = 400) -> str: |
|
|
if not os.path.exists(path): |
|
|
return "(log file not found)" |
|
|
try: |
|
|
with open(path, "r", encoding="utf-8", errors="replace") as f: |
|
|
content = f.readlines() |
|
|
return "".join(content[-lines:]) |
|
|
except Exception as e: |
|
|
return f"(failed to read log: {e})" |
|
|
|
|
|
def read_file_bytes(path: str) -> bytes: |
|
|
try: |
|
|
if not os.path.exists(path): |
|
|
return b"" |
|
|
with open(path, "rb") as f: |
|
|
return f.read() |
|
|
except Exception: |
|
|
return b"" |
|
|
|
|
|
def _render_background_settings(): |
|
|
stock_images = { |
|
|
"Sunset Beach": "stock_images/sunset_beach.jpg", |
|
|
"Urban Office": "stock_images/urban_office.jpg", |
|
|
"Studio Lighting": "stock_images/studio_light.jpg", |
|
|
} |
|
|
st.header("2. Background Settings") |
|
|
bg_type = st.radio( |
|
|
"Select Background Type:", |
|
|
["Image", "Color", "Stock", "AI Prompt"], |
|
|
horizontal=True, |
|
|
key="bg_type_radio" |
|
|
) |
|
|
background = None |
|
|
|
|
|
if bg_type == "Image": |
|
|
bg_image = st.file_uploader( |
|
|
"Upload Background Image", |
|
|
type=["jpg", "png", "jpeg"], |
|
|
key="bg_image_uploader" |
|
|
) |
|
|
if bg_image is not None: |
|
|
bg_image.seek(0) |
|
|
background = Image.open(bg_image).convert("RGB") |
|
|
st.image(background, caption="Selected Background", use_column_width=True) |
|
|
|
|
|
elif bg_type == "Color": |
|
|
selected_color = st.color_picker( |
|
|
"Choose Background Color", |
|
|
st.session_state.get('bg_color', "#00FF00"), |
|
|
key="color_picker" |
|
|
) |
|
|
background = selected_color |
|
|
color_preview = np.full( |
|
|
(100, 100, 3), |
|
|
tuple(int(selected_color.lstrip('#')[i:i+2], 16) for i in (0, 2, 4)), |
|
|
dtype=np.uint8 |
|
|
) |
|
|
st.image(color_preview, caption="Selected Color", width=200) |
|
|
|
|
|
elif bg_type == "Stock": |
|
|
stock_choice = st.selectbox( |
|
|
"Choose a professional stock background:", |
|
|
list(stock_images.keys()), |
|
|
key="stock_image_select" |
|
|
) |
|
|
stock_img_path = stock_images[stock_choice] |
|
|
try: |
|
|
background = Image.open(stock_img_path).convert("RGB") |
|
|
st.image(background, caption=stock_choice, use_column_width=True) |
|
|
except FileNotFoundError: |
|
|
st.warning(f"Stock image not found: {stock_img_path}. Upload your own image or choose another.") |
|
|
background = None |
|
|
|
|
|
elif bg_type == "AI Prompt": |
|
|
prompt = st.text_input("Describe the background to generate (AI):", key="ai_bg_prompt") |
|
|
ai_ready = False |
|
|
if st.button("Generate Background", key="gen_bg_btn") and prompt: |
|
|
background = Image.new("RGB", (512, 320), (64, 32, 96)) |
|
|
st.session_state.generated_bg = background |
|
|
st.success("AI-generated background (stub). Replace with your generator!") |
|
|
ai_ready = True |
|
|
elif "generated_bg" in st.session_state: |
|
|
background = st.session_state.generated_bg |
|
|
ai_ready = True |
|
|
if ai_ready and background is not None: |
|
|
st.image(background, caption="Generated Background", use_column_width=True) |
|
|
|
|
|
return background, bg_type |
|
|
|
|
|
def render_ui(process_video_func): |
|
|
try: |
|
|
with st.sidebar: |
|
|
st.subheader("System Status") |
|
|
st.caption(f"UI build: {UI_BUILD}") |
|
|
st.markdown("**Log file:** `/tmp/app.log`") |
|
|
|
|
|
if st.session_state.get('gpu_available', False): |
|
|
try: |
|
|
import torch |
|
|
dev = torch.cuda.get_device_name(0) |
|
|
except Exception: |
|
|
dev = "Detected (name unavailable)" |
|
|
st.success(f"GPU: {dev}") |
|
|
else: |
|
|
st.error("GPU: Not Available") |
|
|
|
|
|
st.number_input("Tail last N lines", min_value=50, max_value=5000, step=50, key="log_tail_lines") |
|
|
log_bytes = read_file_bytes("/tmp/app.log") |
|
|
st.download_button( |
|
|
"Download Logs", |
|
|
data=log_bytes if log_bytes else b"Log file not available yet.", |
|
|
file_name="app.log", |
|
|
mime="text/plain", |
|
|
use_container_width=True, |
|
|
disabled=not bool(log_bytes) |
|
|
) |
|
|
with st.expander("View Log Tail", expanded=True): |
|
|
if st.button("Refresh log", use_container_width=True, key="refresh_log_btn"): |
|
|
st.session_state['_last_log_refresh'] = time.time() |
|
|
log_text = tail_file("/tmp/app.log", st.session_state.get('log_tail_lines', 400)) |
|
|
st.code(log_text, language="text") |
|
|
|
|
|
col1, col2 = st.columns([1, 1], gap="large") |
|
|
|
|
|
with col1: |
|
|
st.header("1. Upload Video") |
|
|
uploaded_video = st.file_uploader( |
|
|
"Upload Video", |
|
|
type=["mp4", "mov", "avi", "mkv", "webm"], |
|
|
key="video_uploader" |
|
|
) |
|
|
st.markdown("### Video Preview") |
|
|
video_preview_placeholder = st.empty() |
|
|
if uploaded_video is not None: |
|
|
try: |
|
|
uploaded_video.seek(0) |
|
|
video_bytes = uploaded_video.read() |
|
|
st.session_state.video_bytes_cache = video_bytes |
|
|
with video_preview_placeholder.container(): |
|
|
st.video(video_bytes) |
|
|
except Exception as e: |
|
|
logger.error(f"[UI] Video preview error: {e}", exc_info=True) |
|
|
video_preview_placeholder.error(f"Cannot display video: {e}") |
|
|
else: |
|
|
video_preview_placeholder.empty() |
|
|
|
|
|
with col2: |
|
|
background, bg_type = _render_background_settings() |
|
|
st.header("3. Process Video") |
|
|
|
|
|
progress_container = st.container() |
|
|
with progress_container: |
|
|
progress_bar = st.progress(0) |
|
|
status_text = st.empty() |
|
|
stage_status = st.empty() |
|
|
|
|
|
can_process = ( |
|
|
st.session_state.get('video_bytes_cache') is not None |
|
|
and not st.session_state.get('processing', False) |
|
|
and (background is not None) |
|
|
) |
|
|
|
|
|
if st.button("Process Video", disabled=not can_process, use_container_width=True): |
|
|
try: |
|
|
logger.info("Process Video button clicked") |
|
|
|
|
|
import io |
|
|
class _MemFile: |
|
|
def __init__(self, name, data): |
|
|
self.name = name |
|
|
self._b = io.BytesIO(data) |
|
|
def read(self): |
|
|
self._b.seek(0) |
|
|
return self._b.read() |
|
|
def seek(self, pos): |
|
|
self._b.seek(pos) |
|
|
|
|
|
st.session_state.processing = True |
|
|
mem_video = _MemFile("uploaded.mp4", st.session_state.video_bytes_cache) |
|
|
|
|
|
|
|
|
thread = threading.Thread( |
|
|
target=process_video_func, |
|
|
args=(mem_video, background, bg_type.lower()), |
|
|
daemon=True |
|
|
) |
|
|
thread.start() |
|
|
|
|
|
|
|
|
while thread.is_alive() or st.session_state.get('processing', False): |
|
|
status = get_progress() |
|
|
|
|
|
if status.get('active') or not status.get('complete'): |
|
|
|
|
|
progress_bar.progress(status.get('progress', 0)) |
|
|
status_text.info(f"**Status:** {status.get('message', 'Processing...')}") |
|
|
stage_status.markdown(f"**Current Stage:** {status.get('stage', 'Unknown')}") |
|
|
|
|
|
|
|
|
if status.get('error'): |
|
|
status_text.error(f"**Error:** {status['error']}") |
|
|
break |
|
|
|
|
|
|
|
|
if status.get('complete'): |
|
|
break |
|
|
|
|
|
time.sleep(1) |
|
|
|
|
|
|
|
|
thread.join(timeout=5) |
|
|
|
|
|
|
|
|
final_status = get_progress() |
|
|
if final_status.get('error'): |
|
|
progress_bar.progress(final_status.get('progress', 0)) |
|
|
status_text.error(f"**Status:** Processing failed - {final_status['error']}") |
|
|
st.error(f"Processing failed: {final_status['error']}") |
|
|
elif st.session_state.get('processed_video_bytes'): |
|
|
progress_bar.progress(100) |
|
|
status_text.success("**Status:** Processing complete!") |
|
|
st.success("Video processing complete!") |
|
|
else: |
|
|
status_text.error("**Status:** Processing failed. Check logs.") |
|
|
st.error("Processing failed. Check logs for details.") |
|
|
|
|
|
except Exception as e: |
|
|
st.session_state.processing = False |
|
|
logger.error(f"[UI] Process video error: {e}", exc_info=True) |
|
|
status_text.error(f"**Status:** Error - {str(e)}") |
|
|
st.error(f"Processing error: {str(e)}. Check logs for details.") |
|
|
|
|
|
if st.session_state.get('processed_video_bytes') is not None: |
|
|
st.markdown("---") |
|
|
st.markdown("### Processed Video") |
|
|
try: |
|
|
st.video(st.session_state.processed_video_bytes) |
|
|
st.download_button( |
|
|
label="Download Processed Video", |
|
|
data=st.session_state.processed_video_bytes, |
|
|
file_name="processed_video.mp4", |
|
|
mime="video/mp4", |
|
|
use_container_width=True |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"[UI] Display error: {e}", exc_info=True) |
|
|
st.error(f"Display error: {e}") |
|
|
|
|
|
if st.session_state.get('last_error'): |
|
|
with st.expander("Last Error", expanded=True): |
|
|
st.error(st.session_state.last_error) |
|
|
if st.button("Clear Error"): |
|
|
st.session_state.last_error = None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[UI] Render UI error: {e}", exc_info=True) |
|
|
st.error(f"UI rendering error: {str(e)}. Check logs for details.") |