File size: 9,921 Bytes
c137c1a 27c4727 c137c1a f5cc616 1be662e 87688ee 1be662e c137c1a 4559cb6 f5cc616 bcb443b 4559cb6 27c4727 4559cb6 1be662e 4559cb6 386575c 3cd16f5 20cbc35 3cd16f5 87688ee 27c4727 3cd16f5 27c4727 87688ee 27c4727 b8796b9 3cd16f5 87688ee 3cd16f5 b8796b9 1be662e b8796b9 1be662e 87688ee 27c4727 3cd16f5 27c4727 ee1b711 37f2d16 4559cb6 f5cc616 3cd16f5 f5cc616 3cd16f5 f5cc616 3cd16f5 27c4727 bcb443b 3cd16f5 27c4727 3cd16f5 27c4727 3cd16f5 f5cc616 bcb443b 3cd16f5 42d7c0b 4559cb6 f5cc616 2a63856 4559cb6 1be662e f5cc616 bd64f57 bcb443b 4559cb6 f5cc616 1be662e f5cc616 386575c f5cc616 4559cb6 80ac736 4559cb6 1be662e ee1b711 bd64f57 1be662e 27c4727 f5cc616 1be662e f5cc616 27c4727 f5cc616 a9f51ee f5cc616 1be662e f5cc616 1be662e f5cc616 1be662e ee1b711 1be662e f5cc616 27c4727 fb9272e 1be662e fb9272e 1be662e fb9272e 1be662e fb9272e 1be662e fb9272e 1be662e fb9272e f5cc616 fb9272e f5cc616 fb9272e 1be662e fb9272e 1be662e f5cc616 1be662e f5cc616 1be662e f5cc616 1be662e f5cc616 1be662e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MatAnyone adapter — Using Official API (File-Based)
(Enhanced logging, explicit error handling, and stage progress)
...
"""
from __future__ import annotations
import os
import time
import logging
import tempfile
import importlib.metadata
from pathlib import Path
from typing import Optional, Callable, Tuple
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
# ---------- Progress helper ----------
def _env_flag(name: str, default: str = "0") -> bool:
return os.getenv(name, default).strip().lower() in {"1", "true", "yes", "on"}
_PROGRESS_CB_ENABLED = _env_flag("MATANY_PROGRESS", "1")
_PROGRESS_MIN_INTERVAL = float(os.getenv("MATANY_PROGRESS_MIN_SEC", "0.25"))
_progress_last = 0.0
_progress_last_msg = None
_progress_disabled = False
def _emit_progress(cb, pct: float, msg: str):
global _progress_last, _progress_last_msg, _progress_disabled
if not cb or not _PROGRESS_CB_ENABLED or _progress_disabled:
return
now = time.time()
if (now - _progress_last) < _PROGRESS_MIN_INTERVAL and msg == _progress_last_msg:
return
try:
try:
cb(pct, msg)
except TypeError:
cb(msg)
_progress_last = now
_progress_last_msg = msg
except Exception as e:
_progress_disabled = True
log.warning("[progress-cb] disabled due to exception: %s", e)
class MatAnyError(RuntimeError):
pass
def _cuda_snapshot(device: Optional[str]) -> str:
try:
import torch
if not torch.cuda.is_available():
return "CUDA: N/A"
idx = 0
if device and device.startswith("cuda:"):
try:
idx = int(device.split(":")[1])
except (ValueError, IndexError):
idx = 0
name = torch.cuda.get_device_name(idx)
alloc = torch.cuda.memory_allocated(idx) / (1024**3)
resv = torch.cuda.memory_reserved(idx) / (1024**3)
return f"device={idx}, name={name}, alloc={alloc:.2f}GB, reserved={resv:.2f}GB"
except Exception as e:
return f"CUDA snapshot error: {e!r}"
def _safe_empty_cache():
try:
import torch
if torch.cuda.is_available():
log.info(f"[MATANY] CUDA memory before empty_cache: {_cuda_snapshot('cuda:0')}")
torch.cuda.empty_cache()
log.info(f"[MATANY] CUDA memory after empty_cache: {_cuda_snapshot('cuda:0')}")
except Exception:
pass
class MatAnyoneSession:
"""
Simple wrapper around MatAnyone's official API.
Uses file-based input/output as designed by the MatAnyone authors.
"""
def __init__(self, device: Optional[str] = None, precision: str = "auto"):
log.info(f"[MatAnyoneSession.__init__] device={device}, precision={precision}") # [LOG+SAFETY PATCH]
self.device = device or ("cuda" if self._cuda_available() else "cpu")
self.precision = precision.lower()
try:
version = importlib.metadata.version("matanyone")
log.info(f"[MATANY] MatAnyone version: {version}")
except Exception:
log.info("[MATANY] MatAnyone version unknown")
try:
from matanyone import InferenceCore
self.processor = InferenceCore("PeiqingYang/MatAnyone")
log.info("[MATANY] MatAnyone InferenceCore initialized successfully")
except Exception as e:
log.error(f"[MatAnyoneSession.__init__] Failed to initialize MatAnyone: {e}", exc_info=True) # [LOG+SAFETY PATCH]
raise MatAnyError(f"Failed to initialize MatAnyone: {e}")
def _cuda_available(self) -> bool:
try:
import torch
return torch.cuda.is_available()
except Exception:
return False
def process_stream(
self,
video_path: Path,
seed_mask_path: Optional[Path] = None,
out_dir: Optional[Path] = None,
progress_cb: Optional[Callable] = None,
) -> Tuple[Path, Path]:
log.info(f"[MatAnyoneSession.process_stream] Start: video={video_path}, mask={seed_mask_path}, out_dir={out_dir}") # [LOG+SAFETY PATCH]
video_path = Path(video_path)
if not video_path.exists():
log.error(f"[MatAnyoneSession.process_stream] Video file not found: {video_path}") # [LOG+SAFETY PATCH]
raise MatAnyError(f"Video file not found: {video_path}")
if seed_mask_path and not Path(seed_mask_path).exists():
log.error(f"[MatAnyoneSession.process_stream] Seed mask not found: {seed_mask_path}") # [LOG+SAFETY PATCH]
raise MatAnyError(f"Seed mask not found: {seed_mask_path}")
out_dir = Path(out_dir) if out_dir else video_path.parent / "matanyone_output"
out_dir.mkdir(parents=True, exist_ok=True)
log.info(f"[MATANY] Processing video: {video_path}")
log.info(f"[MATANY] Using mask: {seed_mask_path}")
log.info(f"[MATANY] Output directory: {out_dir}")
_emit_progress(progress_cb, 0.0, "Initializing MatAnyone processing...")
try:
start_time = time.time()
_emit_progress(progress_cb, 0.1, "Running MatAnyone video matting...")
foreground_path, alpha_path = self.processor.process_video(
input_path=str(video_path),
mask_path=str(seed_mask_path) if seed_mask_path else None,
output_path=str(out_dir)
)
processing_time = time.time() - start_time
log.info(f"[MATANY] Processing completed in {processing_time:.1f}s")
log.info(f"[MATANY] Foreground output: {foreground_path}")
log.info(f"[MATANY] Alpha output: {alpha_path}")
fg_path = Path(foreground_path) if foreground_path else None
al_path = Path(alpha_path) if alpha_path else None
if not fg_path or not fg_path.exists():
log.error(f"[MatAnyoneSession.process_stream] Foreground output not created: {fg_path}") # [LOG+SAFETY PATCH]
raise MatAnyError(f"Foreground output not created: {fg_path}")
if not al_path or not al_path.exists():
log.error(f"[MatAnyoneSession.process_stream] Alpha output not created: {al_path}") # [LOG+SAFETY PATCH]
raise MatAnyError(f"Alpha output not created: {al_path}")
_emit_progress(progress_cb, 1.0, "MatAnyone processing complete")
log.info(f"[MatAnyoneSession.process_stream] Success, returning paths.") # [LOG+SAFETY PATCH]
return al_path, fg_path # (alpha, foreground)
except Exception as e:
log.error(f"[MatAnyoneSession.process_stream] Processing failed: {e}", exc_info=True) # [LOG+SAFETY PATCH]
raise MatAnyError(f"MatAnyone processing failed: {e}")
finally:
_safe_empty_cache()
class MatAnyoneModel:
"""Wrapper class for MatAnyone to match app_hf.py interface"""
def __init__(self, device="cuda"):
log.info(f"[MatAnyoneModel.__init__] device={device}") # [LOG+SAFETY PATCH]
self.device = device
self.session = None
self.loaded = False
self._load_model()
def _load_model(self):
try:
self.session = MatAnyoneSession(device=self.device, precision="auto")
self.loaded = True
log.info("[MatAnyoneModel._load_model] Loaded successfully") # [LOG+SAFETY PATCH]
except Exception as e:
log.error(f"[MatAnyoneModel._load_model] Error loading: {e}", exc_info=True) # [LOG+SAFETY PATCH]
self.loaded = False
def replace_background(self, video_path, masks, background_path):
log.info(f"[MatAnyoneModel.replace_background] Start") # [LOG+SAFETY PATCH]
if not self.loaded:
log.error("[MatAnyoneModel.replace_background] Model not loaded") # [LOG+SAFETY PATCH]
raise MatAnyError("MatAnyoneModel not loaded")
try:
video_path = Path(video_path)
mask_path = Path(masks) if isinstance(masks, (str, Path)) else None
with tempfile.TemporaryDirectory() as temp_dir:
output_dir = Path(temp_dir)
alpha_path, fg_path = self.session.process_stream(
video_path=video_path,
seed_mask_path=mask_path,
out_dir=output_dir,
progress_cb=None
)
log.info(f"[MatAnyoneModel.replace_background] Success, returning fg_path: {fg_path}") # [LOG+SAFETY PATCH]
return str(fg_path)
except Exception as e:
log.error(f"[MatAnyoneModel.replace_background] Error: {e}", exc_info=True) # [LOG+SAFETY PATCH]
raise MatAnyError(f"Background replacement failed: {e}")
def create_matanyone_session(device=None):
log.info(f"[create_matanyone_session] device={device}") # [LOG+SAFETY PATCH]
return MatAnyoneSession(device=device)
def run_matanyone_on_files(video_path, mask_path, output_dir, device="cuda", progress_callback=None):
log.info(f"[run_matanyone_on_files] Start: video={video_path}, mask={mask_path}, out={output_dir}, device={device}") # [LOG+SAFETY PATCH]
try:
session = MatAnyoneSession(device=device)
alpha_path, fg_path = session.process_stream(
video_path=Path(video_path),
seed_mask_path=Path(mask_path) if mask_path else None,
out_dir=Path(output_dir),
progress_cb=progress_callback
)
log.info(f"[run_matanyone_on_files] Success, returning (alpha, fg): {alpha_path}, {fg_path}") # [LOG+SAFETY PATCH]
return str(alpha_path), str(fg_path)
except Exception as e:
log.error(f"[run_matanyone_on_files] MatAnyone processing failed: {e}", exc_info=True) # [LOG+SAFETY PATCH]
return None, None
|