|
|
|
|
|
""" |
|
|
Advanced Video Background Replacer - Streamlit Entrypoint |
|
|
""" |
|
|
import os |
|
|
import sys |
|
|
import time |
|
|
from pathlib import Path |
|
|
import logging |
|
|
import logging.handlers |
|
|
import traceback |
|
|
import uuid |
|
|
from tempfile import NamedTemporaryFile |
|
|
import threading |
|
|
import streamlit as st |
|
|
|
|
|
from ui import render_ui |
|
|
from pipeline.video_pipeline import ( |
|
|
stage1_create_transparent_video, |
|
|
stage2_composite_background, |
|
|
setup_t4_environment, |
|
|
check_gpu |
|
|
) |
|
|
from models.model_loaders import load_sam2, load_matanyone |
|
|
from utils.progress_tracker import init_progress, update_progress, mark_complete, get_progress |
|
|
|
|
|
APP_NAME = "Advanced Video Background Replacer" |
|
|
LOG_FILE = "/tmp/app.log" |
|
|
LOG_MAX_BYTES = 5 * 1024 * 1024 |
|
|
LOG_BACKUPS = 5 |
|
|
|
|
|
def setup_logging(level: int = logging.INFO) -> logging.Logger: |
|
|
logger = logging.getLogger(APP_NAME) |
|
|
logger.setLevel(level) |
|
|
logger.propagate = False |
|
|
for h in list(logger.handlers): |
|
|
logger.removeHandler(h) |
|
|
ch = logging.StreamHandler(sys.stdout) |
|
|
ch.setLevel(level) |
|
|
ch.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) |
|
|
fh = logging.handlers.RotatingFileHandler( |
|
|
LOG_FILE, maxBytes=LOG_MAX_BYTES, backupCount=LOG_BACKUPS, encoding="utf-8" |
|
|
) |
|
|
fh.setLevel(level) |
|
|
fh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) |
|
|
logger.addHandler(ch) |
|
|
logger.addHandler(fh) |
|
|
return logger |
|
|
|
|
|
logger = setup_logging() |
|
|
|
|
|
def custom_excepthook(type, value, tb): |
|
|
logger.error(f"Unhandled: {type.__name__}: {value}\n{''.join(traceback.format_tb(tb))}", exc_info=True) |
|
|
sys.excepthook = custom_excepthook |
|
|
|
|
|
sam2_predictor = load_sam2() |
|
|
matanyone_processor = load_matanyone() |
|
|
|
|
|
def initialize_session_state(): |
|
|
defaults = { |
|
|
'uploaded_video': None, |
|
|
'video_bytes_cache': None, |
|
|
'video_preview_placeholder': None, |
|
|
'bg_image_cache': None, |
|
|
'bg_preview_placeholder': None, |
|
|
'bg_color': "#00FF00", |
|
|
'cached_color': None, |
|
|
'color_display_cache': None, |
|
|
'processed_video_bytes': None, |
|
|
'processing': False, |
|
|
'gpu_available': None, |
|
|
'last_video_id': None, |
|
|
'last_bg_image_id': None, |
|
|
'last_error': None, |
|
|
'log_level_name': 'INFO', |
|
|
'auto_refresh_logs': False, |
|
|
'log_tail_lines': 400, |
|
|
'generated_bg': None, |
|
|
} |
|
|
for k, v in defaults.items(): |
|
|
if k not in st.session_state: |
|
|
st.session_state[k] = v |
|
|
if st.session_state.gpu_available is None: |
|
|
st.session_state.gpu_available = check_gpu(logger) |
|
|
|
|
|
def process_video_background(uploaded_video, background, bg_type): |
|
|
"""Background thread for video processing""" |
|
|
run_id = uuid.uuid4().hex[:8] |
|
|
logger.info("=" * 80) |
|
|
logger.info(f"[RUN {run_id}] VIDEO PROCESSING STARTED at {time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}") |
|
|
|
|
|
t0 = time.time() |
|
|
try: |
|
|
init_progress() |
|
|
update_progress("Starting video processing...", 0, "Initialization") |
|
|
|
|
|
suffix = Path(uploaded_video.name).suffix or ".mp4" |
|
|
with NamedTemporaryFile(delete=False, suffix=suffix) as tmp_vid: |
|
|
uploaded_video.seek(0) |
|
|
tmp_vid.write(uploaded_video.read()) |
|
|
tmp_vid_path = tmp_vid.name |
|
|
logger.info(f"[RUN {run_id}] Temporary video path: {tmp_vid_path}") |
|
|
|
|
|
def progress_cb(msg): |
|
|
|
|
|
progress = 0 |
|
|
stage = "Processing" |
|
|
|
|
|
if "Stage 1 initiated" in msg: |
|
|
progress, stage = 5, "Stage 1" |
|
|
elif "GPU engaged" in msg: |
|
|
progress, stage = 10, "Stage 1 - SAM2" |
|
|
elif "SAM2 processing frame" in msg: |
|
|
progress, stage = 15, "Stage 1 - SAM2" |
|
|
elif "SAM2 complete" in msg: |
|
|
progress, stage = 25, "Stage 1 - SAM2" |
|
|
elif "MatAnyone starting" in msg: |
|
|
progress, stage = 30, "Stage 1 - MatAnyone" |
|
|
elif "MatAnyone processing" in msg: |
|
|
progress, stage = 50, "Stage 1 - MatAnyone" |
|
|
elif "MatAnyone complete" in msg: |
|
|
progress, stage = 70, "Stage 1 - MatAnyone" |
|
|
elif "Smoothing" in msg: |
|
|
progress, stage = 75, "Stage 1 - Smoothing" |
|
|
elif "transparent video" in msg: |
|
|
progress, stage = 80, "Stage 1 - Finalizing" |
|
|
elif "Stage 1 complete" in msg: |
|
|
progress, stage = 85, "Stage 1 Complete" |
|
|
elif "Stage 2 begun" in msg: |
|
|
progress, stage = 86, "Stage 2" |
|
|
elif "Compositing" in msg: |
|
|
progress, stage = 90, "Stage 2 - Compositing" |
|
|
elif "Restoring audio" in msg: |
|
|
progress, stage = 95, "Stage 2 - Audio" |
|
|
elif "Stage 2 complete" in msg: |
|
|
progress, stage = 98, "Stage 2 Complete" |
|
|
|
|
|
update_progress(msg, progress, stage) |
|
|
logger.info(f"[PROGRESS] {msg}") |
|
|
|
|
|
transparent_path, audio_path = stage1_create_transparent_video( |
|
|
tmp_vid_path, |
|
|
sam2_predictor=sam2_predictor, |
|
|
matanyone_processor=matanyone_processor, |
|
|
mat_timeout_sec=300, |
|
|
progress_callback=progress_cb |
|
|
) |
|
|
|
|
|
if not transparent_path or not os.path.exists(transparent_path): |
|
|
raise RuntimeError("Stage 1 failed: Transparent video not created") |
|
|
logger.info(f"[RUN {run_id}] Stage 1 completed") |
|
|
|
|
|
final_path = stage2_composite_background( |
|
|
transparent_path, |
|
|
audio_path, |
|
|
background, |
|
|
bg_type.lower(), |
|
|
progress_callback=progress_cb |
|
|
) |
|
|
|
|
|
if not final_path or not os.path.exists(final_path): |
|
|
raise RuntimeError("Stage 2 failed: Final video not created") |
|
|
logger.info(f"[RUN {run_id}] Stage 2 completed") |
|
|
|
|
|
with open(final_path, 'rb') as f: |
|
|
st.session_state.processed_video_bytes = f.read() |
|
|
|
|
|
total = time.time() - t0 |
|
|
logger.info(f"[RUN {run_id}] SUCCESS total Δ={total:.2f}s") |
|
|
mark_complete(success=True) |
|
|
|
|
|
except Exception as e: |
|
|
total = time.time() - t0 |
|
|
error_msg = f"Processing Error: {str(e)} (Δ {total:.2f}s)" |
|
|
logger.error(error_msg) |
|
|
logger.error(traceback.format_exc()) |
|
|
st.session_state.last_error = error_msg |
|
|
mark_complete(success=False, error=str(e)) |
|
|
finally: |
|
|
st.session_state.processing = False |
|
|
logger.info(f"[RUN {run_id}] Processing finished") |
|
|
|
|
|
def main(): |
|
|
try: |
|
|
st.set_page_config( |
|
|
page_title=APP_NAME, |
|
|
page_icon="🎥", |
|
|
layout="wide", |
|
|
initial_sidebar_state="expanded" |
|
|
) |
|
|
initialize_session_state() |
|
|
render_ui(process_video_background) |
|
|
except Exception as e: |
|
|
logger.error(f"Main app error: {e}", exc_info=True) |
|
|
st.error(f"App startup failed: {str(e)}. Check logs for details.") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
setup_t4_environment() |
|
|
main() |