File size: 9,921 Bytes
c137c1a
27c4727
c137c1a
f5cc616
1be662e
87688ee
1be662e
c137c1a
 
4559cb6
 
 
f5cc616
bcb443b
4559cb6
27c4727
4559cb6
1be662e
4559cb6
 
386575c
3cd16f5
20cbc35
3cd16f5
 
 
87688ee
 
27c4727
3cd16f5
27c4727
87688ee
27c4727
b8796b9
3cd16f5
87688ee
3cd16f5
b8796b9
 
1be662e
b8796b9
1be662e
87688ee
27c4727
3cd16f5
27c4727
 
ee1b711
37f2d16
4559cb6
 
f5cc616
3cd16f5
f5cc616
3cd16f5
 
 
f5cc616
 
 
 
 
3cd16f5
27c4727
bcb443b
3cd16f5
 
27c4727
3cd16f5
27c4727
3cd16f5
f5cc616
 
 
 
 
bcb443b
 
3cd16f5
42d7c0b
4559cb6
f5cc616
 
2a63856
4559cb6
1be662e
f5cc616
bd64f57
bcb443b
 
 
 
 
4559cb6
f5cc616
 
 
 
1be662e
f5cc616
 
 
386575c
f5cc616
 
 
 
 
4559cb6
 
 
80ac736
 
 
4559cb6
1be662e
ee1b711
bd64f57
1be662e
27c4727
f5cc616
1be662e
f5cc616
 
27c4727
f5cc616
 
 
 
a9f51ee
f5cc616
 
 
 
 
 
 
 
 
 
 
 
 
 
1be662e
f5cc616
 
1be662e
f5cc616
 
1be662e
 
ee1b711
1be662e
f5cc616
27c4727
 
 
fb9272e
 
 
1be662e
fb9272e
 
 
 
 
 
 
 
1be662e
fb9272e
1be662e
fb9272e
 
1be662e
fb9272e
1be662e
fb9272e
 
 
f5cc616
fb9272e
 
 
 
f5cc616
fb9272e
 
 
1be662e
fb9272e
 
1be662e
f5cc616
 
 
1be662e
f5cc616
 
 
1be662e
f5cc616
 
 
 
 
 
 
 
1be662e
f5cc616
 
1be662e
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MatAnyone adapter — Using Official API (File-Based)
(Enhanced logging, explicit error handling, and stage progress)

...
"""
from __future__ import annotations
import os
import time
import logging
import tempfile
import importlib.metadata
from pathlib import Path
from typing import Optional, Callable, Tuple

logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

# ---------- Progress helper ----------
def _env_flag(name: str, default: str = "0") -> bool:
    return os.getenv(name, default).strip().lower() in {"1", "true", "yes", "on"}

_PROGRESS_CB_ENABLED = _env_flag("MATANY_PROGRESS", "1")
_PROGRESS_MIN_INTERVAL = float(os.getenv("MATANY_PROGRESS_MIN_SEC", "0.25"))
_progress_last = 0.0
_progress_last_msg = None
_progress_disabled = False

def _emit_progress(cb, pct: float, msg: str):
    global _progress_last, _progress_last_msg, _progress_disabled
    if not cb or not _PROGRESS_CB_ENABLED or _progress_disabled:
        return
    now = time.time()
    if (now - _progress_last) < _PROGRESS_MIN_INTERVAL and msg == _progress_last_msg:
        return
    try:
        try:
            cb(pct, msg)
        except TypeError:
            cb(msg)
        _progress_last = now
        _progress_last_msg = msg
    except Exception as e:
        _progress_disabled = True
        log.warning("[progress-cb] disabled due to exception: %s", e)

class MatAnyError(RuntimeError):
    pass

def _cuda_snapshot(device: Optional[str]) -> str:
    try:
        import torch
        if not torch.cuda.is_available():
            return "CUDA: N/A"
        idx = 0
        if device and device.startswith("cuda:"):
            try:
                idx = int(device.split(":")[1])
            except (ValueError, IndexError):
                idx = 0
        name = torch.cuda.get_device_name(idx)
        alloc = torch.cuda.memory_allocated(idx) / (1024**3)
        resv = torch.cuda.memory_reserved(idx) / (1024**3)
        return f"device={idx}, name={name}, alloc={alloc:.2f}GB, reserved={resv:.2f}GB"
    except Exception as e:
        return f"CUDA snapshot error: {e!r}"

def _safe_empty_cache():
    try:
        import torch
        if torch.cuda.is_available():
            log.info(f"[MATANY] CUDA memory before empty_cache: {_cuda_snapshot('cuda:0')}")
            torch.cuda.empty_cache()
            log.info(f"[MATANY] CUDA memory after empty_cache: {_cuda_snapshot('cuda:0')}")
    except Exception:
        pass

class MatAnyoneSession:
    """
    Simple wrapper around MatAnyone's official API.
    Uses file-based input/output as designed by the MatAnyone authors.
    """
    def __init__(self, device: Optional[str] = None, precision: str = "auto"):
        log.info(f"[MatAnyoneSession.__init__] device={device}, precision={precision}")  # [LOG+SAFETY PATCH]
        self.device = device or ("cuda" if self._cuda_available() else "cpu")
        self.precision = precision.lower()
        try:
            version = importlib.metadata.version("matanyone")
            log.info(f"[MATANY] MatAnyone version: {version}")
        except Exception:
            log.info("[MATANY] MatAnyone version unknown")
        try:
            from matanyone import InferenceCore
            self.processor = InferenceCore("PeiqingYang/MatAnyone")
            log.info("[MATANY] MatAnyone InferenceCore initialized successfully")
        except Exception as e:
            log.error(f"[MatAnyoneSession.__init__] Failed to initialize MatAnyone: {e}", exc_info=True)  # [LOG+SAFETY PATCH]
            raise MatAnyError(f"Failed to initialize MatAnyone: {e}")
    
    def _cuda_available(self) -> bool:
        try:
            import torch
            return torch.cuda.is_available()
        except Exception:
            return False
    
    def process_stream(
        self,
        video_path: Path,
        seed_mask_path: Optional[Path] = None,
        out_dir: Optional[Path] = None,
        progress_cb: Optional[Callable] = None,
    ) -> Tuple[Path, Path]:
        log.info(f"[MatAnyoneSession.process_stream] Start: video={video_path}, mask={seed_mask_path}, out_dir={out_dir}")  # [LOG+SAFETY PATCH]
        video_path = Path(video_path)
        if not video_path.exists():
            log.error(f"[MatAnyoneSession.process_stream] Video file not found: {video_path}")  # [LOG+SAFETY PATCH]
            raise MatAnyError(f"Video file not found: {video_path}")
        if seed_mask_path and not Path(seed_mask_path).exists():
            log.error(f"[MatAnyoneSession.process_stream] Seed mask not found: {seed_mask_path}")  # [LOG+SAFETY PATCH]
            raise MatAnyError(f"Seed mask not found: {seed_mask_path}")
        out_dir = Path(out_dir) if out_dir else video_path.parent / "matanyone_output"
        out_dir.mkdir(parents=True, exist_ok=True)
        log.info(f"[MATANY] Processing video: {video_path}")
        log.info(f"[MATANY] Using mask: {seed_mask_path}")
        log.info(f"[MATANY] Output directory: {out_dir}")
        _emit_progress(progress_cb, 0.0, "Initializing MatAnyone processing...")
        try:
            start_time = time.time()
            _emit_progress(progress_cb, 0.1, "Running MatAnyone video matting...")
            foreground_path, alpha_path = self.processor.process_video(
                input_path=str(video_path),
                mask_path=str(seed_mask_path) if seed_mask_path else None,
                output_path=str(out_dir)
            )
            processing_time = time.time() - start_time
            log.info(f"[MATANY] Processing completed in {processing_time:.1f}s")
            log.info(f"[MATANY] Foreground output: {foreground_path}")
            log.info(f"[MATANY] Alpha output: {alpha_path}")
            fg_path = Path(foreground_path) if foreground_path else None
            al_path = Path(alpha_path) if alpha_path else None
            if not fg_path or not fg_path.exists():
                log.error(f"[MatAnyoneSession.process_stream] Foreground output not created: {fg_path}")  # [LOG+SAFETY PATCH]
                raise MatAnyError(f"Foreground output not created: {fg_path}")
            if not al_path or not al_path.exists():
                log.error(f"[MatAnyoneSession.process_stream] Alpha output not created: {al_path}")  # [LOG+SAFETY PATCH]
                raise MatAnyError(f"Alpha output not created: {al_path}")
            _emit_progress(progress_cb, 1.0, "MatAnyone processing complete")
            log.info(f"[MatAnyoneSession.process_stream] Success, returning paths.")  # [LOG+SAFETY PATCH]
            return al_path, fg_path  # (alpha, foreground)
        except Exception as e:
            log.error(f"[MatAnyoneSession.process_stream] Processing failed: {e}", exc_info=True)  # [LOG+SAFETY PATCH]
            raise MatAnyError(f"MatAnyone processing failed: {e}")
        finally:
            _safe_empty_cache()

class MatAnyoneModel:
    """Wrapper class for MatAnyone to match app_hf.py interface"""
    def __init__(self, device="cuda"):
        log.info(f"[MatAnyoneModel.__init__] device={device}")  # [LOG+SAFETY PATCH]
        self.device = device
        self.session = None
        self.loaded = False
        self._load_model()
    def _load_model(self):
        try:
            self.session = MatAnyoneSession(device=self.device, precision="auto")
            self.loaded = True
            log.info("[MatAnyoneModel._load_model] Loaded successfully")  # [LOG+SAFETY PATCH]
        except Exception as e:
            log.error(f"[MatAnyoneModel._load_model] Error loading: {e}", exc_info=True)  # [LOG+SAFETY PATCH]
            self.loaded = False
    def replace_background(self, video_path, masks, background_path):
        log.info(f"[MatAnyoneModel.replace_background] Start")  # [LOG+SAFETY PATCH]
        if not self.loaded:
            log.error("[MatAnyoneModel.replace_background] Model not loaded")  # [LOG+SAFETY PATCH]
            raise MatAnyError("MatAnyoneModel not loaded")
        try:
            video_path = Path(video_path)
            mask_path = Path(masks) if isinstance(masks, (str, Path)) else None
            with tempfile.TemporaryDirectory() as temp_dir:
                output_dir = Path(temp_dir)
                alpha_path, fg_path = self.session.process_stream(
                    video_path=video_path,
                    seed_mask_path=mask_path,
                    out_dir=output_dir,
                    progress_cb=None
                )
                log.info(f"[MatAnyoneModel.replace_background] Success, returning fg_path: {fg_path}")  # [LOG+SAFETY PATCH]
                return str(fg_path)
        except Exception as e:
            log.error(f"[MatAnyoneModel.replace_background] Error: {e}", exc_info=True)  # [LOG+SAFETY PATCH]
            raise MatAnyError(f"Background replacement failed: {e}")

def create_matanyone_session(device=None):
    log.info(f"[create_matanyone_session] device={device}")  # [LOG+SAFETY PATCH]
    return MatAnyoneSession(device=device)

def run_matanyone_on_files(video_path, mask_path, output_dir, device="cuda", progress_callback=None):
    log.info(f"[run_matanyone_on_files] Start: video={video_path}, mask={mask_path}, out={output_dir}, device={device}")  # [LOG+SAFETY PATCH]
    try:
        session = MatAnyoneSession(device=device)
        alpha_path, fg_path = session.process_stream(
            video_path=Path(video_path),
            seed_mask_path=Path(mask_path) if mask_path else None,
            out_dir=Path(output_dir),
            progress_cb=progress_callback
        )
        log.info(f"[run_matanyone_on_files] Success, returning (alpha, fg): {alpha_path}, {fg_path}")  # [LOG+SAFETY PATCH]
        return str(alpha_path), str(fg_path)
    except Exception as e:
        log.error(f"[run_matanyone_on_files] MatAnyone processing failed: {e}", exc_info=True)  # [LOG+SAFETY PATCH]
        return None, None