MogensR commited on
Commit
b88e3be
Β·
1 Parent(s): 235ab01

Update core/app.py

Browse files
Files changed (1) hide show
  1. core/app.py +443 -418
core/app.py CHANGED
@@ -1,457 +1,482 @@
1
  #!/usr/bin/env python3
2
  """
3
- Two-Stage Green-Screen Processing System βœ… 2025-08-26
4
- Stage 1: Original β†’ keyed background (auto-selected colour)
5
- Stage 2: Keyed video β†’ final composite (hybrid chroma + segmentation rescue)
6
-
7
- Aligned with current project layout:
8
- * uses helpers from utils.cv_processing (segment_person_hq, refine_mask_hq)
9
- * safe local create_video_writer (no core.app dependency)
10
- * cancel support via stop_event
11
- * progress_callback(pct, desc)
12
- * fully self-contained – just drop in and import TwoStageProcessor
13
  """
14
 
15
  from __future__ import annotations
16
 
17
- import cv2, numpy as np, os, gc, pickle, logging, tempfile, traceback, threading
18
- from pathlib import Path
19
- from typing import Optional, Dict, Any, Callable, Tuple, List
20
-
21
- from utils.cv_processing import segment_person_hq, refine_mask_hq
22
 
23
- # Project logger if available
24
  try:
25
- from utils.logger import get_logger
26
- logger = get_logger(__name__)
27
  except Exception:
28
- logger = logging.getLogger(__name__)
29
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30
 
31
- # ---------------------------------------------------------------------------
32
- # Local video-writer helper
33
- # ---------------------------------------------------------------------------
34
- def create_video_writer(output_path: str, fps: float, width: int, height: int, prefer_mp4: bool = True):
35
- try:
36
- ext = ".mp4" if prefer_mp4 else ".avi"
37
- if not output_path:
38
- output_path = tempfile.mktemp(suffix=ext)
39
- else:
40
- base, curr_ext = os.path.splitext(output_path)
41
- if curr_ext.lower() not in [".mp4", ".avi", ".mov", ".mkv"]:
42
- output_path = base + ext
43
-
44
- fourcc = cv2.VideoWriter_fourcc(*("mp4v" if prefer_mp4 else "XVID"))
45
- writer = cv2.VideoWriter(output_path, fourcc, float(fps), (int(width), int(height)))
46
- if writer is None or not writer.isOpened():
47
- alt_ext = ".avi" if prefer_mp4 else ".mp4"
48
- alt_fourcc = cv2.VideoWriter_fourcc(*("XVID" if prefer_mp4 else "mp4v"))
49
- alt_path = os.path.splitext(output_path)[0] + alt_ext
50
- writer = cv2.VideoWriter(alt_path, alt_fourcc, float(fps), (int(width), int(height)))
51
- if writer is None or not writer.isOpened():
52
- return None, output_path
53
- return writer, alt_path
54
- return writer, output_path
55
- except Exception as e:
56
- logger.error(f"create_video_writer failed: {e}")
57
- return None, output_path
58
-
59
-
60
- # ---------------------------------------------------------------------------
61
- # Key-colour helpers (fast, no external deps)
62
- # ---------------------------------------------------------------------------
63
- def _bgr_to_hsv_hue_deg(bgr: np.ndarray) -> np.ndarray:
64
- hsv = cv2.cvtColor(bgr, cv2.COLOR_BGR2HSV)
65
- # OpenCV H is 0-180; scale to degrees 0-360
66
- return hsv[..., 0].astype(np.float32) * 2.0
67
-
68
-
69
- def _hue_distance(a_deg: float, b_deg: float) -> float:
70
- """Circular distance on the hue wheel (degrees)."""
71
- d = abs(a_deg - b_deg) % 360.0
72
- return min(d, 360.0 - d)
73
-
74
-
75
- def _key_candidates_bgr() -> dict:
76
- return {
77
- "green": {"bgr": np.array([ 0,255, 0], dtype=np.uint8), "hue": 120.0},
78
- "blue": {"bgr": np.array([255, 0, 0], dtype=np.uint8), "hue": 240.0},
79
- "cyan": {"bgr": np.array([255,255, 0], dtype=np.uint8), "hue": 180.0},
80
- "magenta": {"bgr": np.array([255, 0,255], dtype=np.uint8), "hue": 300.0},
81
- }
82
-
83
-
84
- def _choose_best_key_color(frame_bgr: np.ndarray, mask_uint8: np.ndarray) -> dict:
85
- """Pick the candidate colour farthest from the actor’s dominant hues."""
86
- try:
87
- fg = frame_bgr[mask_uint8 > 127]
88
- if fg.size < 1_000:
89
- return _key_candidates_bgr()["green"]
90
-
91
- fg_hue = _bgr_to_hsv_hue_deg(fg.reshape(-1, 1, 3)).reshape(-1)
92
- hist, edges = np.histogram(fg_hue, bins=36, range=(0.0, 360.0))
93
- top_idx = np.argsort(hist)[-3:]
94
- top_hues = [(edges[i] + edges[i+1]) * 0.5 for i in top_idx]
95
-
96
- best_name, best_score = None, -1.0
97
- for name, info in _key_candidates_bgr().items():
98
- cand_hue = info["hue"]
99
- score = min(abs((cand_hue - th + 180) % 360 - 180) for th in top_hues)
100
- if score > best_score:
101
- best_name, best_score = name, score
102
- return _key_candidates_bgr().get(best_name, _key_candidates_bgr()["green"])
103
- except Exception:
104
- return _key_candidates_bgr()["green"]
105
-
106
-
107
- # ---------------------------------------------------------------------------
108
- # Chroma presets
109
- # ---------------------------------------------------------------------------
110
- CHROMA_PRESETS: Dict[str, Dict[str, Any]] = {
111
- 'standard': {'key_color': [0,255,0], 'tolerance': 38, 'edge_softness': 2, 'spill_suppression': 0.35},
112
- 'studio': {'key_color': [0,255,0], 'tolerance': 30, 'edge_softness': 1, 'spill_suppression': 0.45},
113
- 'outdoor': {'key_color': [0,255,0], 'tolerance': 50, 'edge_softness': 3, 'spill_suppression': 0.25},
114
- }
115
-
116
-
117
- # ---------------------------------------------------------------------------
118
- # Two-Stage Processor
119
- # ---------------------------------------------------------------------------
120
- class TwoStageProcessor:
121
- def __init__(self, sam2_predictor=None, matanyone_model=None):
122
- self.sam2 = self._unwrap_sam2(sam2_predictor)
123
- self.matanyone = matanyone_model
124
- self.mask_cache_dir = Path("/tmp/mask_cache")
125
- self.mask_cache_dir.mkdir(parents=True, exist_ok=True)
126
- logger.info(f"TwoStageProcessor init – SAM2: {self.sam2 is not None} | MatAnyOne: {self.matanyone is not None}")
127
-
128
- # ---------------------------------------------------------------------
129
- # Stage 1 – Original β†’ keyed (green/blue/…) -- chooses colour on 1st frame
130
- # ---------------------------------------------------------------------
131
- def stage1_extract_to_greenscreen(
132
- self,
133
- video_path: str,
134
- output_path: str,
135
- *,
136
- key_color_mode: str = "auto", # "auto" | "green" | "blue" | "cyan" | "magenta"
137
- progress_callback: Optional[Callable[[float, str], None]] = None,
138
- stop_event: Optional["threading.Event"] = None,
139
- ) -> Tuple[Optional[dict], str]:
140
-
141
- def _prog(p, d):
142
- if progress_callback:
143
- try:
144
- progress_callback(float(p), str(d))
145
- except Exception:
146
- pass
147
-
148
  try:
149
- _prog(0.0, "Stage 1: opening video…")
150
  cap = cv2.VideoCapture(video_path)
151
- if not cap.isOpened():
152
- return None, "Could not open input video"
153
-
154
- fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
155
- total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0
156
- w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
157
- h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
158
-
159
- writer, out_path = create_video_writer(output_path, fps, w, h)
160
- if writer is None:
161
- cap.release()
162
- return None, "Could not create output writer"
163
-
164
- key_info: dict | None = None
165
- chosen_bgr = np.array([0, 255, 0], np.uint8) # default
166
- probe_done = False
167
- masks: List[np.ndarray] = []
168
- frame_idx = 0
169
-
170
- green_bg_template = np.zeros((h, w, 3), np.uint8) # overwritten per-frame
171
-
172
- while True:
173
- if stop_event and stop_event.is_set():
174
- _prog(1.0, "Stage 1: cancelled")
175
- break
176
-
177
- ok, frame = cap.read()
178
- if not ok:
179
- break
180
-
181
- mask = self._get_mask(frame)
182
-
183
- # decide key colour once
184
- if not probe_done:
185
- if key_color_mode.lower() == "auto":
186
- key_info = _choose_best_key_color(frame, mask)
187
- chosen_bgr = key_info["bgr"]
188
- else:
189
- cand = _key_candidates_bgr().get(key_color_mode.lower())
190
- if cand is not None:
191
- chosen_bgr = cand["bgr"]
192
- probe_done = True
193
- logger.info(f"[TwoStage] Using key colour: {key_color_mode} β†’ {chosen_bgr.tolist()}")
194
-
195
- # optional refine
196
- if self.matanyone and frame_idx % 3 == 0:
197
- try:
198
- mask = refine_mask_hq(frame, mask, self.matanyone, fallback_enabled=True)
199
- except Exception as e:
200
- logger.warning(f"MatAnyOne refine fail f={frame_idx}: {e}")
201
-
202
- # composite
203
- green_bg_template[:] = chosen_bgr
204
- gs = self._apply_greenscreen_hard(frame, mask, green_bg_template)
205
- writer.write(gs)
206
- masks.append(self._to_binary_mask(mask))
207
-
208
- frame_idx += 1
209
- pct = 0.05 + 0.9 * (frame_idx / total) if total else min(0.95, 0.05 + frame_idx * 0.002)
210
- _prog(pct, f"Stage 1: {frame_idx}/{total or '?'}")
211
-
212
  cap.release()
213
- writer.release()
 
 
 
 
 
 
 
 
 
 
 
214
 
215
- # save mask cache
216
  try:
217
- cache_file = self.mask_cache_dir / (Path(out_path).stem + "_masks.pkl")
218
- with open(cache_file, "wb") as f:
219
- pickle.dump(masks, f)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
220
  except Exception as e:
221
- logger.warning(f"mask cache save fail: {e}")
222
-
223
- _prog(1.0, "Stage 1: complete")
224
- return (
225
- {"path": out_path, "frames": frame_idx, "key_bgr": chosen_bgr.tolist()},
226
- f"Green-screen video created ({frame_idx} frames)"
227
- )
228
-
229
- except Exception as e:
230
- logger.error(f"Stage 1 error: {e}\n{traceback.format_exc()}")
231
- return None, f"Stage 1 failed: {e}"
232
-
233
- # ---------------------------------------------------------------------
234
- # Stage 2 – keyed video β†’ final composite (hybrid matte)
235
- # ---------------------------------------------------------------------
236
- def stage2_greenscreen_to_final(
237
  self,
238
- gs_path: str,
239
- background: np.ndarray | str,
240
- output_path: str,
241
- *,
242
- chroma_settings: Optional[Dict[str, Any]] = None,
243
- progress_callback: Optional[Callable[[float, str], None]] = None,
244
- stop_event: Optional["threading.Event"] = None,
 
 
245
  ) -> Tuple[Optional[str], str]:
 
 
 
 
 
 
246
 
247
- def _prog(p, d):
248
- if progress_callback:
249
- try:
250
- progress_callback(float(p), str(d))
251
- except Exception:
252
- pass
253
 
254
  try:
255
- _prog(0.0, "Stage 2: opening keyed video…")
256
- cap = cv2.VideoCapture(gs_path)
257
- if not cap.isOpened():
258
- return None, "Could not open keyed video"
259
-
260
- fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
261
- total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0
262
- w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
263
- h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
264
-
265
- writer, out_path = create_video_writer(output_path, fps, w, h)
266
- if writer is None:
267
- cap.release()
268
- return None, "Could not create output writer"
269
-
270
- # background
271
- if isinstance(background, str):
272
- bg = cv2.imread(background, cv2.IMREAD_COLOR)
273
- if bg is None:
274
- cap.release()
275
- writer.release()
276
- return None, "Could not load background"
277
  else:
278
- bg = background
279
- bg = cv2.resize(bg, (w, h), interpolation=cv2.INTER_LANCZOS4).astype(np.uint8)
280
-
281
- # settings
282
- settings = dict(CHROMA_PRESETS['standard'])
283
- if chroma_settings:
284
- settings.update(chroma_settings)
285
-
286
- # load cached masks if any
287
- cache_file = self.mask_cache_dir / (Path(gs_path).stem + "_masks.pkl")
288
- cached_masks = None
289
- if cache_file.exists():
290
- try:
291
- with open(cache_file, 'rb') as f:
292
- cached_masks = pickle.load(f)
293
- except Exception as e:
294
- logger.warning(f"mask cache load fail: {e}")
295
-
296
- frame_idx = 0
297
- while True:
298
- if stop_event and stop_event.is_set():
299
- _prog(1.0, "Stage 2: cancelled")
300
- break
301
- ok, frame = cap.read()
302
- if not ok:
303
- break
304
-
305
- if cached_masks and frame_idx < len(cached_masks):
306
- seg_mask = cached_masks[frame_idx]
307
- else:
308
- seg_mask = self._segmentation_mask_on_stage2(frame)
309
-
310
- composite = self._chroma_key_advanced(frame, bg, settings, seg_mask)
311
-
312
- writer.write(composite)
313
- frame_idx += 1
314
- pct = 0.05 + 0.9 * (frame_idx / total) if total else min(0.95, 0.05 + frame_idx * 0.002)
315
- _prog(pct, f"Stage 2: {frame_idx}/{total or '?'}")
316
-
317
- cap.release()
318
- writer.release()
319
- _prog(1.0, "Stage 2: complete")
320
- return out_path, f"Final video created ({frame_idx} frames)"
321
  except Exception as e:
322
- logger.error(f"Stage 2 error: {e}\n{traceback.format_exc()}")
323
- return None, f"Stage 2 failed: {e}"
 
324
 
325
- # ---------------------------------------------------------------------
326
- # Full pipeline – now passes chosen key into Stage 2
327
- # ---------------------------------------------------------------------
328
- def process_full_pipeline(
329
  self,
330
  video_path: str,
331
- background: np.ndarray | str,
332
- final_output: str,
333
- *,
334
- key_color_mode: str = "auto",
335
- chroma_settings: Optional[Dict[str, Any]] = None,
336
- progress_callback: Optional[Callable[[float, str], None]] = None,
337
- stop_event: Optional["threading.Event"] = None,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
338
  ) -> Tuple[Optional[str], str]:
339
- gs_tmp = tempfile.mktemp(suffix="_gs.mp4")
 
 
 
 
 
 
 
 
 
 
 
340
  try:
341
- gs_info, msg1 = self.stage1_extract_to_greenscreen(
342
- video_path, gs_tmp,
343
- key_color_mode=key_color_mode,
344
- progress_callback=progress_callback, stop_event=stop_event
345
  )
346
- if gs_info is None:
347
- return None, msg1
348
-
349
- # inject key colour into chroma settings for Stage 2
350
- chosen_key = gs_info.get("key_bgr", [0, 255, 0])
351
- cs = dict(chroma_settings or CHROMA_PRESETS['standard'])
352
- cs['key_color'] = chosen_key
353
-
354
- result, msg2 = self.stage2_greenscreen_to_final(
355
- gs_info["path"], background, final_output,
356
- chroma_settings=cs, progress_callback=progress_callback, stop_event=stop_event
357
- )
358
- return result, msg2
359
- finally:
360
- try:
361
- os.remove(gs_tmp)
362
- except Exception:
363
- pass
364
- gc.collect()
365
-
366
- # ---------------------------------------------------------------------
367
- # Internal helpers
368
- # ---------------------------------------------------------------------
369
- def _unwrap_sam2(self, obj):
 
 
370
  try:
371
- if obj is None:
372
- return None
373
- if all(hasattr(obj, attr) for attr in ("set_image", "predict")):
374
- return obj
375
- for attr in ("model", "predictor"):
376
- inner = getattr(obj, attr, None)
377
- if inner and all(hasattr(inner, a) for a in ("set_image", "predict")):
378
- return inner
379
  except Exception as e:
380
- logger.warning(f"SAM2 unwrap fail: {e}")
381
- return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
382
 
383
- def _get_mask(self, frame: np.ndarray) -> np.ndarray:
384
  try:
385
- return segment_person_hq(frame, self.sam2, fallback_enabled=True)
386
- except Exception as e:
387
- logger.warning(f"Segmentation fallback: {e}")
388
- h, w = frame.shape[:2]
389
- m = np.zeros((h, w), np.uint8)
390
- m[h//6:5*h//6, w//4:3*w//4] = 255
391
- return m
392
-
393
- def _apply_greenscreen_hard(self, frame, mask, green_bg):
394
- mask_u8 = self._to_binary_mask(mask)
395
- mk = cv2.cvtColor(mask_u8, cv2.COLOR_GRAY2BGR).astype(np.float32) / 255.0
396
- out = frame.astype(np.float32) * mk + green_bg.astype(np.float32) * (1.0 - mk)
397
- return np.clip(out, 0, 255).astype(np.uint8)
398
 
399
- @staticmethod
400
- def _to_binary_mask(mask: np.ndarray) -> np.ndarray:
401
- if mask.ndim == 3:
402
- mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)
403
- if mask.dtype != np.uint8:
404
- mask = (np.clip(mask, 0, 1) * 255).astype(np.uint8) if mask.max() <= 1.0 else np.clip(mask, 0, 255).astype(np.uint8)
405
- _, binm = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY)
406
- return binm
407
-
408
- def _segmentation_mask_on_stage2(self, frame_bgr: np.ndarray) -> Optional[np.ndarray]:
409
  try:
410
- if self.sam2 is None:
411
- return None
412
- return self._get_mask(frame_bgr)
413
  except Exception:
414
- return None
415
 
416
- def _chroma_key_advanced(
417
- self,
418
- frame_bgr: np.ndarray,
419
- bg_bgr: np.ndarray,
420
- settings: Dict[str, Any],
421
- seg_mask: Optional[np.ndarray] = None,
422
- ) -> np.ndarray:
423
  try:
424
- key = np.array(settings.get("key_color", [0, 255, 0]), dtype=np.float32)
425
- tol = float(settings.get("tolerance", 40))
426
- soft = int(settings.get("edge_softness", 2))
427
- spill= float(settings.get("spill_suppression", 0.3))
428
-
429
- f = frame_bgr.astype(np.float32)
430
- b = bg_bgr.astype(np.float32)
431
-
432
- diff = np.linalg.norm(f - key, axis=2)
433
- alpha = np.clip((diff - tol * 0.6) / max(1e-6, tol * 0.4), 0.0, 1.0)
434
- if soft > 0:
435
- k = soft * 2 + 1
436
- alpha = cv2.GaussianBlur(alpha, (k, k), soft)
437
-
438
- # segmentation rescue
439
- if seg_mask is not None:
440
- if seg_mask.ndim == 3:
441
- seg_mask = cv2.cvtColor(seg_mask, cv2.COLOR_BGR2GRAY)
442
- seg = seg_mask.astype(np.float32) / 255.0
443
- seg = cv2.GaussianBlur(seg, (5, 5), 1.0)
444
- alpha = np.clip(np.maximum(alpha, seg * 0.85), 0.0, 1.0)
445
-
446
- # spill suppression
447
- if spill > 0:
448
- zone = 1.0 - alpha
449
- g = f[:, :, 1]
450
- f[:, :, 1] = np.clip(g - g * zone * spill, 0, 255)
451
-
452
- mask3 = np.stack([alpha] * 3, axis=2)
453
- out = f * mask3 + b * (1.0 - mask3)
454
- return np.clip(out, 0, 255).astype(np.uint8)
455
- except Exception as e:
456
- logger.error(f"Chroma key error: {e}")
457
- return frame_bgr
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  #!/usr/bin/env python3
2
  """
3
+ BackgroundFX Pro – Main Application Entry Point
4
+ Refactored modular architecture – orchestrates specialised components
 
 
 
 
 
 
 
 
5
  """
6
 
7
  from __future__ import annotations
8
 
9
+ # ── Early env/threading hygiene (safe default to silence libgomp) ────────────
10
+ import os
11
+ if not os.environ.get("OMP_NUM_THREADS", "").isdigit():
12
+ os.environ["OMP_NUM_THREADS"] = "2"
 
13
 
14
+ # If you use early_env in your project, keep this import (harmless if absent)
15
  try:
16
+ import early_env # sets OMP/MKL/OPENBLAS + torch threads safely
 
17
  except Exception:
18
+ pass
19
 
20
+ import logging
21
+ import threading
22
+ import traceback
23
+ import sys
24
+ from pathlib import Path
25
+ from typing import Optional, Tuple, Dict, Any, Callable
26
+
27
+ # ── Logging ──────────────────────────────────────────────────────────────────
28
+ logging.basicConfig(
29
+ level=logging.INFO,
30
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
31
+ )
32
+ logger = logging.getLogger("core.app")
33
+
34
+ # ── Ensure project root importable ───────────────────────────────────────────
35
+ PROJECT_FILE = Path(__file__).resolve()
36
+ CORE_DIR = PROJECT_FILE.parent
37
+ ROOT = CORE_DIR.parent
38
+ if str(ROOT) not in sys.path:
39
+ sys.path.insert(0, str(ROOT))
40
+
41
+ # ── Gradio schema patch (HF quirk) ───────────────────────────────────────────
42
+ try:
43
+ import gradio_client.utils as gc_utils
44
+ _orig_get_type = gc_utils.get_type
45
+ def _patched_get_type(schema):
46
+ if not isinstance(schema, dict):
47
+ if isinstance(schema, bool): return "boolean"
48
+ if isinstance(schema, str): return "string"
49
+ if isinstance(schema, (int, float)): return "number"
50
+ return "string"
51
+ return _orig_get_type(schema)
52
+ gc_utils.get_type = _patched_get_type
53
+ logger.info("Gradio schema patch applied")
54
+ except Exception as e:
55
+ logger.warning(f"Gradio patch failed: {e}")
56
+
57
+ # ── Core config + components ─────────────────────────────────────────────────
58
+ from config.app_config import get_config
59
+ from core.exceptions import ModelLoadingError, VideoProcessingError
60
+ from utils.hardware.device_manager import DeviceManager
61
+ from utils.system.memory_manager import MemoryManager
62
+ from models.loaders.model_loader import ModelLoader
63
+ from processing.video.video_processor import CoreVideoProcessor
64
+ from processing.audio.audio_processor import AudioProcessor
65
+ from utils.monitoring.progress_tracker import ProgressTracker
66
+ from utils.cv_processing import validate_video_file
67
+
68
+ # ── Optional Two-Stage import (exact module path) ────────────────────────────
69
+ TWO_STAGE_AVAILABLE = False
70
+ TWO_STAGE_IMPORT_ORIGIN = ""
71
+ TWO_STAGE_IMPORT_ERROR = ""
72
+ CHROMA_PRESETS: Dict[str, Dict[str, Any]] = {"standard": {}}
73
+ TwoStageProcessor = None # type: ignore
74
 
75
+ try:
76
+ from processing.two_stage.two_stage_processor import TwoStageProcessor, CHROMA_PRESETS # type: ignore
77
+ TWO_STAGE_AVAILABLE = True
78
+ TWO_STAGE_IMPORT_ORIGIN = "processing.two_stage.two_stage_processor"
79
+ logger.info("Two-stage import OK (%s)", TWO_STAGE_IMPORT_ORIGIN)
80
+ except Exception as e:
81
+ TWO_STAGE_AVAILABLE = False
82
+ TWO_STAGE_IMPORT_ERROR = f"{repr(e)}\n{traceback.format_exc()}"
83
+ logger.warning("Two-stage import FAILED: %s", TWO_STAGE_IMPORT_ERROR.strip().splitlines()[-1])
84
+
85
+ # ╔══════════════��═══════════════════════════════════════════════════════════╗
86
+ # β•‘ VideoProcessor class β•‘
87
+ # β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
88
+ class VideoProcessor:
89
+ """
90
+ Main orchestrator – coordinates all specialised components.
91
+ """
92
+
93
+ def __init__(self):
94
+ self.config = get_config()
95
+ self._patch_config_defaults(self.config) # avoid AttributeError on older configs
96
+
97
+ self.device_manager = DeviceManager()
98
+ self.memory_manager = MemoryManager(self.device_manager.get_optimal_device())
99
+ self.model_loader = ModelLoader(self.device_manager, self.memory_manager)
100
+
101
+ self.audio_processor = AudioProcessor()
102
+ self.core_processor: CoreVideoProcessor | None = None
103
+ self.two_stage_processor: Any | None = None
104
+
105
+ self.models_loaded = False
106
+ self.loading_lock = threading.Lock()
107
+ self.cancel_event = threading.Event()
108
+ self.progress_tracker: ProgressTracker | None = None
109
+
110
+ logger.info(f"VideoProcessor on device: {self.device_manager.get_optimal_device()}")
111
+
112
+ # ── Config hardening: add missing fields safely ───────────────────────────
113
+ @staticmethod
114
+ def _patch_config_defaults(cfg: Any) -> None:
115
+ defaults = {
116
+ # video / i/o
117
+ "use_nvenc": False,
118
+ "prefer_mp4": True,
119
+ "video_codec": "mp4v",
120
+ "audio_copy": True,
121
+ "ffmpeg_path": "ffmpeg",
122
+ # model/resource guards
123
+ "max_model_size": 0,
124
+ "max_model_size_bytes": 0,
125
+ # housekeeping
126
+ "output_dir": str((Path(__file__).resolve().parent.parent) / "outputs"),
127
+ }
128
+ for k, v in defaults.items():
129
+ if not hasattr(cfg, k):
130
+ setattr(cfg, k, v)
131
+ Path(cfg.output_dir).mkdir(parents=True, exist_ok=True)
132
+
133
+ # ── Progress helper ───────────────────────────────────────────────────────
134
+ def _init_progress(self, video_path: str, cb: Optional[Callable] = None):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
135
  try:
136
+ import cv2
137
  cap = cv2.VideoCapture(video_path)
138
+ total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
139
  cap.release()
140
+ if total <= 0:
141
+ total = 100
142
+ self.progress_tracker = ProgressTracker(total, cb)
143
+ except Exception as e:
144
+ logger.warning(f"Progress init failed: {e}")
145
+ self.progress_tracker = ProgressTracker(100, cb)
146
+
147
+ # ── Model loading ─────────────────────────────────────────────────────────
148
+ def load_models(self, progress_callback: Optional[Callable] = None) -> str:
149
+ with self.loading_lock:
150
+ if self.models_loaded:
151
+ return "Models already loaded and validated"
152
 
 
153
  try:
154
+ self.cancel_event.clear()
155
+ if progress_callback:
156
+ progress_callback(0.0, f"Loading on {self.device_manager.get_optimal_device()}")
157
+
158
+ sam2_loaded, mat_loaded = self.model_loader.load_all_models(
159
+ progress_callback=progress_callback, cancel_event=self.cancel_event
160
+ )
161
+ if self.cancel_event.is_set():
162
+ return "Model loading cancelled"
163
+
164
+ # unwrap
165
+ sam2_predictor = getattr(sam2_loaded, "model", None) if sam2_loaded else None
166
+ mat_model = getattr(mat_loaded, "model", None) if mat_loaded else None
167
+
168
+ # single-stage
169
+ self.core_processor = CoreVideoProcessor(config=self.config, models=self.model_loader)
170
+
171
+ # two-stage (optional)
172
+ self.two_stage_processor = None
173
+ if TWO_STAGE_AVAILABLE and (TwoStageProcessor is not None) and (sam2_predictor or mat_model):
174
+ try:
175
+ self.two_stage_processor = TwoStageProcessor(
176
+ sam2_predictor=sam2_predictor, matanyone_model=mat_model
177
+ )
178
+ logger.info("Two-stage processor initialised")
179
+ except Exception as e:
180
+ logger.warning("Two-stage init failed: %r", e)
181
+ self.two_stage_processor = None
182
+
183
+ self.models_loaded = True
184
+ msg = self.model_loader.get_load_summary()
185
+ msg += ("\nβœ… Two-stage processor ready" if self.two_stage_processor else "\n⚠️ Two-stage processor not available")
186
+ logger.info(msg)
187
+ return msg
188
+
189
+ except (AttributeError, ModelLoadingError) as e:
190
+ self.models_loaded = False
191
+ err = f"Model loading failed: {e}"
192
+ logger.error(err)
193
+ return err
194
  except Exception as e:
195
+ self.models_loaded = False
196
+ err = f"Unexpected error during model loading: {e}"
197
+ logger.error(err)
198
+ logger.debug("Traceback:\n%s", traceback.format_exc())
199
+ return err
200
+
201
+ # ── Public entry – process video ─────────────────────────────────────────
202
+ def process_video(
 
 
 
 
 
 
 
 
203
  self,
204
+ video_path: str,
205
+ background_choice: str,
206
+ custom_background_path: Optional[str] = None,
207
+ progress_callback: Optional[Callable] = None,
208
+ use_two_stage: bool = False,
209
+ chroma_preset: str = "standard",
210
+ key_color_mode: str = "auto",
211
+ preview_mask: bool = False,
212
+ preview_greenscreen: bool = False,
213
  ) -> Tuple[Optional[str], str]:
214
+ if not self.models_loaded or not self.core_processor:
215
+ return None, "Models not loaded. Please click β€œLoad Models” first."
216
+ if self.cancel_event.is_set():
217
+ return None, "Processing cancelled"
218
+
219
+ self._init_progress(video_path, progress_callback)
220
 
221
+ ok, why = validate_video_file(video_path)
222
+ if not ok:
223
+ return None, f"Invalid video: {why}"
 
 
 
224
 
225
  try:
226
+ if use_two_stage:
227
+ if not TWO_STAGE_AVAILABLE or self.two_stage_processor is None:
228
+ return None, "Two-stage processing not available on this build"
229
+ return self._process_two_stage(
230
+ video_path,
231
+ background_choice,
232
+ custom_background_path,
233
+ progress_callback,
234
+ chroma_preset,
235
+ key_color_mode,
236
+ )
 
 
 
 
 
 
 
 
 
 
 
237
  else:
238
+ return self._process_single_stage(
239
+ video_path,
240
+ background_choice,
241
+ custom_background_path,
242
+ progress_callback,
243
+ preview_mask,
244
+ preview_greenscreen,
245
+ )
246
+
247
+ except VideoProcessingError as e:
248
+ logger.error(f"Processing failed: {e}")
249
+ return None, f"Processing failed: {e}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
250
  except Exception as e:
251
+ logger.error(f"Unexpected processing error: {e}")
252
+ logger.debug("Traceback:\n%s", traceback.format_exc())
253
+ return None, f"Unexpected error: {e}"
254
 
255
+ # ── Private – single-stage ───────────────────────────────────────────────
256
+ def _process_single_stage(
 
 
257
  self,
258
  video_path: str,
259
+ background_choice: str,
260
+ custom_background_path: Optional[str],
261
+ progress_callback: Optional[Callable],
262
+ preview_mask: bool,
263
+ preview_greenscreen: bool,
264
+ ) -> Tuple[Optional[str], str]:
265
+ import time
266
+ ts = int(time.time())
267
+ out_dir = Path(self.config.output_dir) / "single_stage"
268
+ out_dir.mkdir(parents=True, exist_ok=True)
269
+ out_path = str(out_dir / f"processed_{ts}.mp4")
270
+
271
+ result = self.core_processor.process_video(
272
+ input_path=video_path,
273
+ output_path=out_path,
274
+ bg_config={
275
+ "background_choice": background_choice,
276
+ "custom_path": custom_background_path,
277
+ },
278
+ )
279
+ if not result:
280
+ return None, "Video processing failed"
281
+
282
+ # Mux audio unless preview-only
283
+ if not (preview_mask or preview_greenscreen):
284
+ try:
285
+ final_path = self.audio_processor.add_audio_to_video(
286
+ original_video=video_path, processed_video=out_path
287
+ )
288
+ except Exception as e:
289
+ logger.warning("Audio mux failed, returning video without audio: %r", e)
290
+ final_path = out_path
291
+ else:
292
+ final_path = out_path
293
+
294
+ msg = (
295
+ "Processing completed.\n"
296
+ f"Frames: {result.get('frames', 'unknown')}\n"
297
+ f"Background: {background_choice}\n"
298
+ f"Mode: Single-stage\n"
299
+ f"Device: {self.device_manager.get_optimal_device()}"
300
+ )
301
+ return final_path, msg
302
+
303
+ # ── Private – two-stage ─────────────────────────────────────────────────
304
+ def _process_two_stage(
305
+ self,
306
+ video_path: str,
307
+ background_choice: str,
308
+ custom_background_path: Optional[str],
309
+ progress_callback: Optional[Callable],
310
+ chroma_preset: str,
311
+ key_color_mode: str,
312
  ) -> Tuple[Optional[str], str]:
313
+ if self.two_stage_processor is None:
314
+ return None, "Two-stage processor not available"
315
+
316
+ import cv2, time
317
+ cap = cv2.VideoCapture(video_path)
318
+ if not cap.isOpened():
319
+ return None, "Could not open input video"
320
+ w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 1280
321
+ h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 720
322
+ cap.release()
323
+
324
+ # Prepare background via core processor
325
  try:
326
+ background = self.core_processor.prepare_background(
327
+ background_choice, custom_background_path, w, h
 
 
328
  )
329
+ except Exception as e:
330
+ logger.error("Background preparation failed: %r", e)
331
+ return None, f"Failed to prepare background: {e}"
332
+ if background is None:
333
+ return None, "Failed to prepare background"
334
+
335
+ ts = int(time.time())
336
+ out_dir = Path(self.config.output_dir) / "two_stage"
337
+ out_dir.mkdir(parents=True, exist_ok=True)
338
+ final_out = str(out_dir / f"final_{ts}.mp4")
339
+
340
+ chroma_cfg = CHROMA_PRESETS.get(chroma_preset, CHROMA_PRESETS.get("standard", {}))
341
+ logger.info("Two-stage with preset: %s | key_color_mode=%s", chroma_preset, key_color_mode)
342
+
343
+ result, message = self.two_stage_processor.process_full_pipeline(
344
+ video_path,
345
+ background,
346
+ final_out,
347
+ key_color_mode=key_color_mode,
348
+ chroma_settings=chroma_cfg,
349
+ progress_callback=progress_callback,
350
+ )
351
+ if result is None:
352
+ return None, message
353
+
354
+ # Mux audio from original (same logic as single-stage)
355
  try:
356
+ final_path = self.audio_processor.add_audio_to_video(
357
+ original_video=video_path, processed_video=result
358
+ )
 
 
 
 
 
359
  except Exception as e:
360
+ logger.warning("Audio mux failed for two-stage; returning video without audio: %r", e)
361
+ final_path = result
362
+
363
+ msg = (
364
+ "Two-stage processing completed.\n"
365
+ f"Background: {background_choice}\n"
366
+ f"Chroma Preset: {chroma_preset}\n"
367
+ f"Device: {self.device_manager.get_optimal_device()}"
368
+ )
369
+ return final_path, msg
370
+
371
+ # ── Status helpers ───────────────────────────────────────────────────────
372
+ def get_status(self) -> Dict[str, Any]:
373
+ status = {
374
+ "models_loaded": self.models_loaded,
375
+ "two_stage_available": bool(TWO_STAGE_AVAILABLE and (self.two_stage_processor is not None)),
376
+ "two_stage_origin": TWO_STAGE_IMPORT_ORIGIN or "",
377
+ "two_stage_error": TWO_STAGE_IMPORT_ERROR[:2000] if TWO_STAGE_IMPORT_ERROR else "",
378
+ "device": str(self.device_manager.get_optimal_device()),
379
+ "core_processor_loaded": self.core_processor is not None,
380
+ "config": self._safe_config_dict(),
381
+ "memory_usage": self._safe_memory_usage(),
382
+ }
383
+ try:
384
+ status["sam2_loaded"] = self.model_loader.get_sam2() is not None
385
+ status["matanyone_loaded"] = self.model_loader.get_matanyone() is not None
386
+ except Exception:
387
+ status["sam2_loaded"] = False
388
+ status["matanyone_loaded"] = False
389
+
390
+ if self.progress_tracker:
391
+ status["progress"] = self.progress_tracker.get_all_progress()
392
+ return status
393
 
394
+ def _safe_config_dict(self) -> Dict[str, Any]:
395
  try:
396
+ return self.config.to_dict()
397
+ except Exception:
398
+ keys = ["use_nvenc", "prefer_mp4", "video_codec", "audio_copy",
399
+ "ffmpeg_path", "max_model_size", "max_model_size_bytes", "output_dir"]
400
+ return {k: getattr(self.config, k, None) for k in keys}
 
 
 
 
 
 
 
 
401
 
402
+ def _safe_memory_usage(self) -> Dict[str, Any]:
 
 
 
 
 
 
 
 
 
403
  try:
404
+ return self.memory_manager.get_memory_usage()
 
 
405
  except Exception:
406
+ return {}
407
 
408
+ def cancel_processing(self):
409
+ self.cancel_event.set()
410
+ logger.info("Cancellation requested")
411
+
412
+ def cleanup_resources(self):
 
 
413
  try:
414
+ self.memory_manager.cleanup_aggressive()
415
+ except Exception:
416
+ pass
417
+ try:
418
+ self.model_loader.cleanup()
419
+ except Exception:
420
+ pass
421
+ logger.info("Resources cleaned up")
422
+
423
+
424
+ # ── Singleton + thin wrappers (used by UI callbacks) ────────────────────────
425
+ processor = VideoProcessor()
426
+
427
+ def load_models_with_validation(progress_callback: Optional[Callable] = None) -> str:
428
+ return processor.load_models(progress_callback)
429
+
430
+ def process_video_fixed(
431
+ video_path: str,
432
+ background_choice: str,
433
+ custom_background_path: Optional[str],
434
+ progress_callback: Optional[Callable] = None,
435
+ use_two_stage: bool = False,
436
+ chroma_preset: str = "standard",
437
+ key_color_mode: str = "auto",
438
+ preview_mask: bool = False,
439
+ preview_greenscreen: bool = False,
440
+ ) -> Tuple[Optional[str], str]:
441
+ return processor.process_video(
442
+ video_path,
443
+ background_choice,
444
+ custom_background_path,
445
+ progress_callback,
446
+ use_two_stage,
447
+ chroma_preset,
448
+ key_color_mode,
449
+ preview_mask,
450
+ preview_greenscreen,
451
+ )
452
+
453
+ def get_model_status() -> Dict[str, Any]:
454
+ return processor.get_status()
455
+
456
+ def get_cache_status() -> Dict[str, Any]:
457
+ return processor.get_status()
458
+
459
+ PROCESS_CANCELLED = processor.cancel_event
460
+
461
+
462
+ # ── CLI entrypoint (must exist; app.py imports main) ─────────────────────────
463
+ def main():
464
+ try:
465
+ logger.info("Starting BackgroundFX Pro")
466
+ logger.info(f"Device: {processor.device_manager.get_optimal_device()}")
467
+ logger.info("Two-stage available (import): %s", TWO_STAGE_AVAILABLE)
468
+
469
+ from ui.ui_components import create_interface
470
+ demo = create_interface()
471
+ demo.queue().launch(
472
+ server_name="0.0.0.0",
473
+ server_port=7860,
474
+ show_error=True,
475
+ debug=False,
476
+ )
477
+ finally:
478
+ processor.cleanup_resources()
479
+
480
+
481
+ if __name__ == "__main__":
482
+ main()