MogensR's picture
Update ui.py
bf99458 verified
#!/usr/bin/env python3
"""
Modern UI for Video Background Replacer (PRO)
- File-based progress polling to keep session alive
- Real-time progress updates during long-running operations
"""
import streamlit as st
import os
from pathlib import Path
from PIL import Image
import numpy as np
import logging
import time
import threading
from utils.progress_tracker import get_progress
logger = logging.getLogger("Advanced Video Background Replacer")
UI_BUILD = "ui-2025-10-04-17-00Z"
def tail_file(path: str, lines: int = 400) -> str:
if not os.path.exists(path):
return "(log file not found)"
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
content = f.readlines()
return "".join(content[-lines:])
except Exception as e:
return f"(failed to read log: {e})"
def read_file_bytes(path: str) -> bytes:
try:
if not os.path.exists(path):
return b""
with open(path, "rb") as f:
return f.read()
except Exception:
return b""
def _render_background_settings():
stock_images = {
"Sunset Beach": "stock_images/sunset_beach.jpg",
"Urban Office": "stock_images/urban_office.jpg",
"Studio Lighting": "stock_images/studio_light.jpg",
}
st.header("2. Background Settings")
bg_type = st.radio(
"Select Background Type:",
["Image", "Color", "Stock", "AI Prompt"],
horizontal=True,
key="bg_type_radio"
)
background = None
if bg_type == "Image":
bg_image = st.file_uploader(
"Upload Background Image",
type=["jpg", "png", "jpeg"],
key="bg_image_uploader"
)
if bg_image is not None:
bg_image.seek(0)
background = Image.open(bg_image).convert("RGB")
st.image(background, caption="Selected Background", use_column_width=True)
elif bg_type == "Color":
selected_color = st.color_picker(
"Choose Background Color",
st.session_state.get('bg_color', "#00FF00"),
key="color_picker"
)
background = selected_color
color_preview = np.full(
(100, 100, 3),
tuple(int(selected_color.lstrip('#')[i:i+2], 16) for i in (0, 2, 4)),
dtype=np.uint8
)
st.image(color_preview, caption="Selected Color", width=200)
elif bg_type == "Stock":
stock_choice = st.selectbox(
"Choose a professional stock background:",
list(stock_images.keys()),
key="stock_image_select"
)
stock_img_path = stock_images[stock_choice]
try:
background = Image.open(stock_img_path).convert("RGB")
st.image(background, caption=stock_choice, use_column_width=True)
except FileNotFoundError:
st.warning(f"Stock image not found: {stock_img_path}. Upload your own image or choose another.")
background = None
elif bg_type == "AI Prompt":
prompt = st.text_input("Describe the background to generate (AI):", key="ai_bg_prompt")
ai_ready = False
if st.button("Generate Background", key="gen_bg_btn") and prompt:
background = Image.new("RGB", (512, 320), (64, 32, 96))
st.session_state.generated_bg = background
st.success("AI-generated background (stub). Replace with your generator!")
ai_ready = True
elif "generated_bg" in st.session_state:
background = st.session_state.generated_bg
ai_ready = True
if ai_ready and background is not None:
st.image(background, caption="Generated Background", use_column_width=True)
return background, bg_type
def render_ui(process_video_func):
try:
with st.sidebar:
st.subheader("System Status")
st.caption(f"UI build: {UI_BUILD}")
st.markdown("**Log file:** `/tmp/app.log`")
if st.session_state.get('gpu_available', False):
try:
import torch
dev = torch.cuda.get_device_name(0)
except Exception:
dev = "Detected (name unavailable)"
st.success(f"GPU: {dev}")
else:
st.error("GPU: Not Available")
st.number_input("Tail last N lines", min_value=50, max_value=5000, step=50, key="log_tail_lines")
log_bytes = read_file_bytes("/tmp/app.log")
st.download_button(
"Download Logs",
data=log_bytes if log_bytes else b"Log file not available yet.",
file_name="app.log",
mime="text/plain",
use_container_width=True,
disabled=not bool(log_bytes)
)
with st.expander("View Log Tail", expanded=True):
if st.button("Refresh log", use_container_width=True, key="refresh_log_btn"):
st.session_state['_last_log_refresh'] = time.time()
log_text = tail_file("/tmp/app.log", st.session_state.get('log_tail_lines', 400))
st.code(log_text, language="text")
col1, col2 = st.columns([1, 1], gap="large")
with col1:
st.header("1. Upload Video")
uploaded_video = st.file_uploader(
"Upload Video",
type=["mp4", "mov", "avi", "mkv", "webm"],
key="video_uploader"
)
st.markdown("### Video Preview")
video_preview_placeholder = st.empty()
if uploaded_video is not None:
try:
uploaded_video.seek(0)
video_bytes = uploaded_video.read()
st.session_state.video_bytes_cache = video_bytes
with video_preview_placeholder.container():
st.video(video_bytes)
except Exception as e:
logger.error(f"[UI] Video preview error: {e}", exc_info=True)
video_preview_placeholder.error(f"Cannot display video: {e}")
else:
video_preview_placeholder.empty()
with col2:
background, bg_type = _render_background_settings()
st.header("3. Process Video")
progress_container = st.container()
with progress_container:
progress_bar = st.progress(0)
status_text = st.empty()
stage_status = st.empty()
can_process = (
st.session_state.get('video_bytes_cache') is not None
and not st.session_state.get('processing', False)
and (background is not None)
)
if st.button("Process Video", disabled=not can_process, use_container_width=True):
try:
logger.info("Process Video button clicked")
import io
class _MemFile:
def __init__(self, name, data):
self.name = name
self._b = io.BytesIO(data)
def read(self):
self._b.seek(0)
return self._b.read()
def seek(self, pos):
self._b.seek(pos)
st.session_state.processing = True
mem_video = _MemFile("uploaded.mp4", st.session_state.video_bytes_cache)
# Start processing in background thread
thread = threading.Thread(
target=process_video_func,
args=(mem_video, background, bg_type.lower()),
daemon=True
)
thread.start()
# Poll for progress updates
while thread.is_alive() or st.session_state.get('processing', False):
status = get_progress()
if status.get('active') or not status.get('complete'):
# Update UI
progress_bar.progress(status.get('progress', 0))
status_text.info(f"**Status:** {status.get('message', 'Processing...')}")
stage_status.markdown(f"**Current Stage:** {status.get('stage', 'Unknown')}")
# Check for errors
if status.get('error'):
status_text.error(f"**Error:** {status['error']}")
break
# Check if complete
if status.get('complete'):
break
time.sleep(1) # Poll every second
# Wait for thread to finish
thread.join(timeout=5)
# Final status check
final_status = get_progress()
if final_status.get('error'):
progress_bar.progress(final_status.get('progress', 0))
status_text.error(f"**Status:** Processing failed - {final_status['error']}")
st.error(f"Processing failed: {final_status['error']}")
elif st.session_state.get('processed_video_bytes'):
progress_bar.progress(100)
status_text.success("**Status:** Processing complete!")
st.success("Video processing complete!")
else:
status_text.error("**Status:** Processing failed. Check logs.")
st.error("Processing failed. Check logs for details.")
except Exception as e:
st.session_state.processing = False
logger.error(f"[UI] Process video error: {e}", exc_info=True)
status_text.error(f"**Status:** Error - {str(e)}")
st.error(f"Processing error: {str(e)}. Check logs for details.")
if st.session_state.get('processed_video_bytes') is not None:
st.markdown("---")
st.markdown("### Processed Video")
try:
st.video(st.session_state.processed_video_bytes)
st.download_button(
label="Download Processed Video",
data=st.session_state.processed_video_bytes,
file_name="processed_video.mp4",
mime="video/mp4",
use_container_width=True
)
except Exception as e:
logger.error(f"[UI] Display error: {e}", exc_info=True)
st.error(f"Display error: {e}")
if st.session_state.get('last_error'):
with st.expander("Last Error", expanded=True):
st.error(st.session_state.last_error)
if st.button("Clear Error"):
st.session_state.last_error = None
except Exception as e:
logger.error(f"[UI] Render UI error: {e}", exc_info=True)
st.error(f"UI rendering error: {str(e)}. Check logs for details.")