MogensR's picture
Update core/app.py
c38a46a
#!/usr/bin/env python3
"""
BackgroundFX Pro – Main Application Entry Point
Refactored modular architecture – orchestrates specialised components
"""
from __future__ import annotations
# ── Critical environment defaults (set before any imports) ────────────────
import os
# Set critical defaults directly - HF now supports underscores in env vars
os.environ.setdefault("OMP_NUM_THREADS", "2")
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")
os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "max_split_size_mb:128")
os.environ.setdefault("LOG_LEVEL", "info")
os.environ.setdefault("APP_ENV", "production")
# Ensure reasonable cache defaults if not set
from pathlib import Path
_base_cache = Path.home() / ".cache"
os.environ.setdefault("HF_HOME", str(_base_cache / "huggingface"))
os.environ.setdefault("TRANSFORMERS_CACHE", str(_base_cache / "huggingface" / "hub"))
os.environ.setdefault("TORCH_HOME", str(_base_cache / "torch"))
# Synthesize CLOUDINARY_URL from parts if missing
def _ensure_cloudinary_url():
if os.getenv("CLOUDINARY_URL"):
return
key = os.getenv("CLOUDINARY_API_KEY")
sec = os.getenv("CLOUDINARY_API_SECRET")
name = os.getenv("CLOUDINARY_CLOUD_NAME")
if key and sec and name:
os.environ["CLOUDINARY_URL"] = f"cloudinary://{key}:{sec}@{name}"
_ensure_cloudinary_url()
# If you use early_env in your project, keep this import (harmless if absent)
try:
import early_env # sets OMP/MKL/OPENBLAS + torch threads safely
except Exception:
pass
import logging
import threading
import traceback
import sys
import time
from typing import Optional, Tuple, Dict, Any, Callable
# ── Logging ──────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("core.app")
# ── Ensure project root importable ───────────────────────────────────────────
PROJECT_FILE = Path(__file__).resolve()
CORE_DIR = PROJECT_FILE.parent
ROOT = CORE_DIR.parent
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
# Create loader directories if they don't exist
loaders_dir = ROOT / "models" / "loaders"
loaders_dir.mkdir(parents=True, exist_ok=True)
# ── Gradio schema patch (HF quirk) ───────────────────────────────────────────
try:
import gradio_client.utils as gc_utils
_orig_get_type = gc_utils.get_type
def _patched_get_type(schema):
if not isinstance(schema, dict):
if isinstance(schema, bool): return "boolean"
if isinstance(schema, str): return "string"
if isinstance(schema, (int, float)): return "number"
return "string"
return _orig_get_type(schema)
gc_utils.get_type = _patched_get_type
logger.info("Gradio schema patch applied")
except Exception as e:
logger.warning(f"Gradio patch failed: {e}")
# ── Core config + components ─────────────────────────────────────────────────
try:
from config.app_config import get_config
except ImportError:
# Dummy if missing
class DummyConfig:
def to_dict(self):
return {}
get_config = lambda: DummyConfig()
from utils.hardware.device_manager import DeviceManager
from utils.system.memory_manager import MemoryManager
# Try to import the new split loaders first, fall back to old if needed
try:
from models.loaders.model_loader import ModelLoader
logger.info("Using split loader architecture")
except ImportError:
logger.warning("Split loaders not found, using legacy loader")
# Fall back to old loader if split architecture isn't available yet
from models.model_loader import ModelLoader # type: ignore
from processing.video.video_processor import CoreVideoProcessor
from processing.audio.audio_processor import AudioProcessor
from utils.monitoring.progress_tracker import ProgressTracker
from utils.cv_processing import validate_video_file
# ── Optional Two-Stage import ────────────────────────────────────────────────
TWO_STAGE_AVAILABLE = False
TWO_STAGE_IMPORT_ORIGIN = ""
TWO_STAGE_IMPORT_ERROR = ""
CHROMA_PRESETS: Dict[str, Dict[str, Any]] = {"standard": {}}
TwoStageProcessor = None # type: ignore
# Try multiple import paths for two-stage processor
two_stage_paths = [
"processors.two_stage", # Your fixed version
"processing.two_stage.two_stage_processor",
"processing.two_stage",
]
for import_path in two_stage_paths:
try:
exec(f"from {import_path} import TwoStageProcessor, CHROMA_PRESETS")
TWO_STAGE_AVAILABLE = True
TWO_STAGE_IMPORT_ORIGIN = import_path
logger.info(f"Two-stage import OK ({import_path})")
break
except Exception as e:
TWO_STAGE_IMPORT_ERROR = str(e)
continue
if not TWO_STAGE_AVAILABLE:
logger.warning(f"Two-stage import FAILED from all paths: {TWO_STAGE_IMPORT_ERROR}")
# ── Quiet startup self-check (async by default) ──────────────────────────────
# Place the helper in tools/startup_selfcheck.py (with tools/__init__.py present)
try:
from tools.startup_selfcheck import schedule_startup_selfcheck
except Exception:
schedule_startup_selfcheck = None # graceful if the helper isn't shipped
# Dummy exceptions if core.exceptions not available
class ModelLoadingError(Exception):
pass
class VideoProcessingError(Exception):
pass
# ╔══════════════════════════════════════════════════════════════════════════╗
# β•‘ VideoProcessor class β•‘
# β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
class VideoProcessor:
"""
Main orchestrator – coordinates all specialised components.
"""
def __init__(self):
self.config = get_config()
self._patch_config_defaults(self.config) # avoid AttributeError on older configs
self.device_manager = DeviceManager()
self.memory_manager = MemoryManager(self.device_manager.get_optimal_device())
self.model_loader = ModelLoader(self.device_manager, self.memory_manager)
self.audio_processor = AudioProcessor()
self.core_processor: Optional[CoreVideoProcessor] = None
self.two_stage_processor: Optional[Any] = None
self.models_loaded = False
self.loading_lock = threading.Lock()
self.cancel_event = threading.Event()
self.progress_tracker: Optional[ProgressTracker] = None
logger.info(f"VideoProcessor on device: {self.device_manager.get_optimal_device()}")
# ── Config hardening: add missing fields safely ───────────────────────────
@staticmethod
def _patch_config_defaults(cfg: Any) -> None:
defaults = {
# video / i/o
"use_nvenc": False,
"prefer_mp4": True,
"video_codec": "mp4v",
"audio_copy": True,
"ffmpeg_path": "ffmpeg",
# model/resource guards
"max_model_size": 0,
"max_model_size_bytes": 0,
# housekeeping
"output_dir": str((Path(__file__).resolve().parent.parent) / "outputs"),
# MatAnyone settings to ensure it's enabled
"matanyone_enabled": True,
"use_matanyone": True,
}
for k, v in defaults.items():
if not hasattr(cfg, k):
setattr(cfg, k, v)
Path(cfg.output_dir).mkdir(parents=True, exist_ok=True)
# ── Progress helper ───────────────────────────────────────────────────────
def _init_progress(self, video_path: str, cb: Optional[Callable] = None):
try:
import cv2
cap = cv2.VideoCapture(video_path)
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
if total <= 0:
total = 100
self.progress_tracker = ProgressTracker(total, cb)
except Exception as e:
logger.warning(f"Progress init failed: {e}")
self.progress_tracker = ProgressTracker(100, cb)
# ── Model loading ─────────────────────────────────────────────────────────
def load_models(self, progress_callback: Optional[Callable] = None) -> str:
with self.loading_lock:
if self.models_loaded:
return "Models already loaded and validated"
try:
self.cancel_event.clear()
if progress_callback:
progress_callback(0.0, f"Loading on {self.device_manager.get_optimal_device()}")
sam2_loaded, mat_loaded = self.model_loader.load_all_models(
progress_callback=progress_callback, cancel_event=self.cancel_event
)
if self.cancel_event.is_set():
return "Model loading cancelled"
# Get the actual models
sam2_predictor = sam2_loaded.model if sam2_loaded else None
mat_model = mat_loaded.model if mat_loaded else None # NOTE: stateful callable adapter
# Initialize core processor
self.core_processor = CoreVideoProcessor(config=self.config, models=self.model_loader)
# Initialize two-stage processor if available
self.two_stage_processor = None
if TWO_STAGE_AVAILABLE and TwoStageProcessor and (sam2_predictor or mat_model):
try:
self.two_stage_processor = TwoStageProcessor(
sam2_predictor=sam2_predictor, matanyone_model=mat_model
)
logger.info("Two-stage processor initialised")
except Exception as e:
logger.warning(f"Two-stage init failed: {e}")
self.two_stage_processor = None
self.models_loaded = True
msg = self.model_loader.get_load_summary()
# Add status about processors
if self.two_stage_processor:
msg += "\nβœ… Two-stage processor ready"
else:
msg += "\n⚠️ Two-stage processor not available"
if mat_model:
msg += "\nβœ… MatAnyone refinement active"
else:
msg += "\n⚠️ MatAnyone not loaded (edges may be rough)"
logger.info(msg)
return msg
except (AttributeError, ModelLoadingError) as e:
self.models_loaded = False
err = f"Model loading failed: {e}"
logger.error(err)
return err
except Exception as e:
self.models_loaded = False
err = f"Unexpected error during model loading: {e}"
logger.error(f"{err}\n{traceback.format_exc()}")
return err
# ── Public entry – process video ─────────────────────────────────────────
def process_video(
self,
video_path: str,
background_choice: str,
custom_background_path: Optional[str] = None,
progress_callback: Optional[Callable] = None,
use_two_stage: bool = False,
chroma_preset: str = "standard",
key_color_mode: str = "auto",
preview_mask: bool = False,
preview_greenscreen: bool = False,
) -> Tuple[Optional[str], Optional[str], str]:
# ===== BACKGROUND PATH DEBUG & FIX =====
logger.info("=" * 60)
logger.info("BACKGROUND PATH DEBUGGING")
logger.info(f"background_choice: {background_choice}")
logger.info(f"custom_background_path type: {type(custom_background_path)}")
logger.info(f"custom_background_path value: {custom_background_path}")
# Fix 1: Handle if Gradio sends a dict
if isinstance(custom_background_path, dict):
original = custom_background_path
custom_background_path = custom_background_path.get('name') or custom_background_path.get('path')
logger.info(f"Extracted path from dict: {original} -> {custom_background_path}")
# Fix 2: Handle PIL Image objects
try:
from PIL import Image
if isinstance(custom_background_path, Image.Image):
import tempfile
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp:
custom_background_path.save(tmp.name)
custom_background_path = tmp.name
logger.info(f"Saved PIL Image to: {custom_background_path}")
except ImportError:
pass
# Fix 3: Verify file exists when using custom background
if background_choice == "custom" or custom_background_path:
if custom_background_path:
if Path(custom_background_path).exists():
logger.info(f"βœ… Background file exists: {custom_background_path}")
else:
logger.warning(f"⚠️ Background file does not exist: {custom_background_path}")
# Try to find it in Gradio temp directories
import glob
patterns = [
"/tmp/gradio*/**/*.jpg",
"/tmp/gradio*/**/*.jpeg",
"/tmp/gradio*/**/*.png",
"/tmp/**/*.jpg",
"/tmp/**/*.jpeg",
"/tmp/**/*.png",
]
for pattern in patterns:
files = glob.glob(pattern, recursive=True)
if files:
# Get the most recent file
newest = max(files, key=os.path.getmtime)
logger.info(f"Found potential background: {newest}")
# Only use it if it was created in the last 5 minutes
if (time.time() - os.path.getmtime(newest)) < 300:
custom_background_path = newest
logger.info(f"βœ… Using recent temp file: {custom_background_path}")
break
else:
logger.error("❌ Custom background mode but path is None!")
logger.info(f"Final custom_background_path: {custom_background_path}")
logger.info("=" * 60)
if not self.models_loaded or not self.core_processor:
return None, None, "Models not loaded. Please click 'Load Models' first."
if self.cancel_event.is_set():
return None, None, "Processing cancelled"
self._init_progress(video_path, progress_callback)
ok, why = validate_video_file(video_path)
if not ok:
return None, None, f"Invalid video: {why}"
try:
# Log which mode we're using
mode = "two-stage" if use_two_stage else "single-stage"
matanyone_status = "enabled" if self.model_loader.get_matanyone() else "disabled"
logger.info(f"Processing video in {mode} mode, MatAnyone: {matanyone_status}")
# IMPORTANT: start each video with a clean MatAnyone memory
self._reset_matanyone_session()
if use_two_stage:
if not TWO_STAGE_AVAILABLE or self.two_stage_processor is None:
return None, None, "Two-stage processing not available"
final, green, msg = self._process_two_stage(
video_path,
background_choice,
custom_background_path,
progress_callback,
chroma_preset,
key_color_mode,
)
return final, green, msg
else:
final, green, msg = self._process_single_stage(
video_path,
background_choice,
custom_background_path,
progress_callback,
preview_mask,
preview_greenscreen,
)
return final, green, msg
except VideoProcessingError as e:
logger.error(f"Processing failed: {e}")
return None, None, f"Processing failed: {e}"
except Exception as e:
logger.error(f"Unexpected processing error: {e}\n{traceback.format_exc()}")
return None, None, f"Unexpected error: {e}"
# ── Private – per-video MatAnyone reset ──────────────────────────────────
def _reset_matanyone_session(self):
"""
Ensure a fresh MatAnyone memory per video. The MatAnyone loader we use returns a
callable *stateful adapter*. If present, reset() clears its InferenceCore memory.
"""
try:
mat = self.model_loader.get_matanyone()
except Exception:
mat = None
if mat is not None and hasattr(mat, "reset") and callable(mat.reset):
try:
mat.reset()
logger.info("MatAnyone session reset for new video")
except Exception as e:
logger.warning(f"MatAnyone session reset failed (continuing): {e}")
# ── Private – single-stage ───────────────────────────────────────────────
def _process_single_stage(
self,
video_path: str,
background_choice: str,
custom_background_path: Optional[str],
progress_callback: Optional[Callable],
preview_mask: bool,
preview_greenscreen: bool,
) -> Tuple[Optional[str], Optional[str], str]:
# Additional debug logging for single-stage
logger.info(f"[Single-stage] background_choice: {background_choice}")
logger.info(f"[Single-stage] custom_background_path: {custom_background_path}")
ts = int(time.time())
out_dir = Path(self.config.output_dir) / "single_stage"
out_dir.mkdir(parents=True, exist_ok=True)
out_path = str(out_dir / f"processed_{ts}.mp4")
# Process video via your CoreVideoProcessor
result = self.core_processor.process_video(
input_path=video_path,
output_path=out_path,
bg_config={
"background_choice": background_choice,
"custom_path": custom_background_path,
},
progress_callback=progress_callback,
)
if not result:
return None, None, "Video processing failed"
# Mux audio unless preview-only
if not (preview_mask or preview_greenscreen):
try:
final_path = self.audio_processor.add_audio_to_video(
original_video=video_path, processed_video=out_path
)
except Exception as e:
logger.warning(f"Audio mux failed, returning video without audio: {e}")
final_path = out_path
else:
final_path = out_path
# Build status message
try:
mat_loaded = bool(self.model_loader.get_matanyone())
except Exception:
mat_loaded = False
matanyone_status = "βœ“" if mat_loaded else "βœ—"
msg = (
"Processing completed.\n"
f"Frames: {result.get('frames', 'unknown')}\n"
f"Background: {background_choice}\n"
f"Mode: Single-stage\n"
f"MatAnyone: {matanyone_status}\n"
f"Device: {self.device_manager.get_optimal_device()}"
)
return final_path, None, msg # No green in single-stage
# ── Private – two-stage ─────────────────────────────────────────────────
def _process_two_stage(
self,
video_path: str,
background_choice: str,
custom_background_path: Optional[str],
progress_callback: Optional[Callable],
chroma_preset: str,
key_color_mode: str,
) -> Tuple[Optional[str], Optional[str], str]:
if self.two_stage_processor is None:
return None, None, "Two-stage processor not available"
# Additional debug logging for two-stage
logger.info(f"[Two-stage] background_choice: {background_choice}")
logger.info(f"[Two-stage] custom_background_path: {custom_background_path}")
import cv2
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None, None, "Could not open input video"
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 1280
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 720
cap.release()
# Prepare background
try:
background = self.core_processor.prepare_background(
background_choice, custom_background_path, w, h
)
except Exception as e:
logger.error(f"Background preparation failed: {e}")
return None, None, f"Failed to prepare background: {e}"
if background is None:
return None, None, "Failed to prepare background"
ts = int(time.time())
out_dir = Path(self.config.output_dir) / "two_stage"
out_dir.mkdir(parents=True, exist_ok=True)
final_out = str(out_dir / f"final_{ts}.mp4")
chroma_cfg = CHROMA_PRESETS.get(chroma_preset, CHROMA_PRESETS.get("standard", {}))
logger.info(f"Two-stage with preset: {chroma_preset} | key_color: {key_color_mode}")
# (Per-video reset already called in process_video)
final_path, green_path, stage2_msg = self.two_stage_processor.process_full_pipeline(
video_path,
background,
final_out,
key_color_mode=key_color_mode,
chroma_settings=chroma_cfg,
progress_callback=progress_callback,
)
if final_path is None:
return None, None, stage2_msg
# Mux audio
try:
final_with_audio = self.audio_processor.add_audio_to_video(
original_video=video_path, processed_video=final_path
)
except Exception as e:
logger.warning(f"Audio mux failed: {e}")
final_with_audio = final_path
try:
mat_loaded = bool(self.model_loader.get_matanyone())
except Exception:
mat_loaded = False
matanyone_status = "βœ“" if mat_loaded else "βœ—"
msg = (
"Two-stage processing completed.\n"
f"Background: {background_choice}\n"
f"Chroma Preset: {chroma_preset}\n"
f"MatAnyone: {matanyone_status}\n"
f"Device: {self.device_manager.get_optimal_device()}"
)
return final_with_audio, green_path, msg
# ── Status helpers ───────────────────────────────────────────────────────
def get_status(self) -> Dict[str, Any]:
status = {
"models_loaded": self.models_loaded,
"two_stage_available": bool(TWO_STAGE_AVAILABLE and self.two_stage_processor),
"two_stage_origin": TWO_STAGE_IMPORT_ORIGIN or "",
"device": str(self.device_manager.get_optimal_device()),
"core_processor_loaded": self.core_processor is not None,
"config": self._safe_config_dict(),
"memory_usage": self._safe_memory_usage(),
}
try:
status["sam2_loaded"] = self.model_loader.get_sam2() is not None
status["matanyone_loaded"] = self.model_loader.get_matanyone() is not None
status["model_info"] = self.model_loader.get_model_info()
except Exception:
status["sam2_loaded"] = False
status["matanyone_loaded"] = False
if self.progress_tracker:
status["progress"] = self.progress_tracker.get_all_progress()
return status
def _safe_config_dict(self) -> Dict[str, Any]:
try:
return self.config.to_dict()
except Exception:
keys = ["use_nvenc", "prefer_mp4", "video_codec", "audio_copy",
"ffmpeg_path", "max_model_size", "max_model_size_bytes",
"output_dir", "matanyone_enabled"]
return {k: getattr(self.config, k, None) for k in keys}
def _safe_memory_usage(self) -> Dict[str, Any]:
try:
return self.memory_manager.get_memory_usage()
except Exception:
return {}
def cancel_processing(self):
self.cancel_event.set()
logger.info("Cancellation requested")
def cleanup_resources(self):
try:
self.memory_manager.cleanup_aggressive()
except Exception:
pass
try:
self.model_loader.cleanup()
except Exception:
pass
logger.info("Resources cleaned up")
# ── Singleton + thin wrappers (used by UI callbacks) ────────────────────────
processor = VideoProcessor()
def load_models_with_validation(progress_callback: Optional[Callable] = None) -> str:
return processor.load_models(progress_callback)
def process_video_fixed(
video_path: str,
background_choice: str,
custom_background_path: Optional[str],
progress_callback: Optional[Callable] = None,
use_two_stage: bool = False,
chroma_preset: str = "standard",
key_color_mode: str = "auto",
preview_mask: bool = False,
preview_greenscreen: bool = False,
) -> Tuple[Optional[str], Optional[str], str]:
return processor.process_video(
video_path,
background_choice,
custom_background_path,
progress_callback,
use_two_stage,
chroma_preset,
key_color_mode,
preview_mask,
preview_greenscreen,
)
def get_model_status() -> Dict[str, Any]:
return processor.get_status()
def get_cache_status() -> Dict[str, Any]:
return processor.get_status()
PROCESS_CANCELLED = processor.cancel_event
# ── CLI entrypoint (must exist; app.py imports main) ─────────────────────────
def main():
try:
logger.info("Starting BackgroundFX Pro")
logger.info(f"Device: {processor.device_manager.get_optimal_device()}")
logger.info(f"Two-stage available: {TWO_STAGE_AVAILABLE}")
# πŸ”Ή Quiet model self-check (defaults to async; set SELF_CHECK_MODE=sync to block)
if schedule_startup_selfcheck is not None:
try:
schedule_startup_selfcheck(mode=os.getenv("SELF_CHECK_MODE", "async"))
except Exception as e:
logger.error(f"Startup self-check skipped: {e}", exc_info=True)
# Log model loader type
try:
from models.loaders.model_loader import ModelLoader
logger.info("Using split loader architecture")
except Exception:
logger.info("Using legacy loader")
# FIXED: Move UI import inside main() to avoid circular dependency
# and add better error handling
try:
# Import here to break circular dependency
from ui import ui_components
# Now get the create_interface function
if hasattr(ui_components, 'create_interface'):
create_interface = ui_components.create_interface
else:
logger.error("create_interface not found in ui_components")
logger.error(f"Available attributes: {dir(ui_components)}")
raise ImportError("create_interface function not found")
except ImportError as e:
logger.error(f"Failed to import UI components: {e}")
import traceback
traceback.print_exc()
# Try alternate import method
try:
logger.info("Trying alternate import method...")
import importlib
ui_components = importlib.import_module('ui.ui_components')
create_interface = getattr(ui_components, 'create_interface')
logger.info("Alternate import successful")
except Exception as e2:
logger.error(f"Alternate import also failed: {e2}")
logger.info("System initialized but UI unavailable. Exiting.")
return
# Create and launch the interface
try:
demo = create_interface()
demo.queue().launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True,
debug=False,
)
except Exception as e:
logger.error(f"Failed to launch Gradio interface: {e}")
import traceback
traceback.print_exc()
except Exception as e:
logger.error(f"Fatal error in main: {e}")
import traceback
traceback.print_exc()
finally:
processor.cleanup_resources()
if __name__ == "__main__":
main()