Update utils/cv_processing.py
Browse files- utils/cv_processing.py +386 -252
utils/cv_processing.py
CHANGED
|
@@ -3,7 +3,7 @@
|
|
| 3 |
Contains segmentation, mask refinement, background replacement, and helper functions
|
| 4 |
"""
|
| 5 |
|
| 6 |
-
#
|
| 7 |
import os
|
| 8 |
if 'OMP_NUM_THREADS' not in os.environ:
|
| 9 |
os.environ['OMP_NUM_THREADS'] = '4'
|
|
@@ -23,10 +23,18 @@
|
|
| 23 |
|
| 24 |
# Version control flags for CV functions
|
| 25 |
USE_ENHANCED_SEGMENTATION = True
|
| 26 |
-
USE_AUTO_TEMPORAL_CONSISTENCY = True
|
| 27 |
USE_INTELLIGENT_PROMPTING = True
|
| 28 |
USE_ITERATIVE_REFINEMENT = True
|
| 29 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 30 |
# Professional background templates
|
| 31 |
PROFESSIONAL_BACKGROUNDS = {
|
| 32 |
"office_modern": {
|
|
@@ -102,14 +110,10 @@ class BackgroundReplacementError(Exception):
|
|
| 102 |
pass
|
| 103 |
|
| 104 |
# ============================================================================
|
| 105 |
-
#
|
| 106 |
# ============================================================================
|
| 107 |
|
| 108 |
def _fit_image_letterbox(img_rgb: np.ndarray, dst_w: int, dst_h: int, fill=(32, 32, 32)) -> np.ndarray:
|
| 109 |
-
"""
|
| 110 |
-
Fit an RGB image into (dst_h, dst_w) with letterboxing (no stretch), borders filled with `fill`.
|
| 111 |
-
Returns an RGB image.
|
| 112 |
-
"""
|
| 113 |
h, w = img_rgb.shape[:2]
|
| 114 |
if h == 0 or w == 0:
|
| 115 |
return np.full((dst_h, dst_w, 3), fill, dtype=np.uint8)
|
|
@@ -136,7 +140,7 @@ def _fit_image_letterbox(img_rgb: np.ndarray, dst_w: int, dst_h: int, fill=(32,
|
|
| 136 |
# ============================================================================
|
| 137 |
|
| 138 |
def segment_person_hq(image: np.ndarray, predictor: Any, fallback_enabled: bool = True) -> np.ndarray:
|
| 139 |
-
"""High-quality person segmentation with intelligent automation"""
|
| 140 |
if not USE_ENHANCED_SEGMENTATION:
|
| 141 |
return segment_person_hq_original(image, predictor, fallback_enabled)
|
| 142 |
|
|
@@ -146,50 +150,40 @@ def segment_person_hq(image: np.ndarray, predictor: Any, fallback_enabled: bool
|
|
| 146 |
raise SegmentationError("Invalid input image")
|
| 147 |
|
| 148 |
try:
|
| 149 |
-
|
| 150 |
-
|
| 151 |
-
|
| 152 |
-
|
| 153 |
-
|
| 154 |
-
|
| 155 |
-
|
| 156 |
-
|
| 157 |
-
if fallback_enabled:
|
| 158 |
-
return _fallback_segmentation(image)
|
| 159 |
-
raise SegmentationError("Invalid predictor object")
|
| 160 |
-
|
| 161 |
-
try:
|
| 162 |
-
predictor.set_image(image)
|
| 163 |
-
except Exception as e:
|
| 164 |
-
logger.error(f"Failed to set image in predictor: {e}")
|
| 165 |
-
if fallback_enabled:
|
| 166 |
-
return _fallback_segmentation(image)
|
| 167 |
-
raise SegmentationError(f"Predictor setup failed: {e}")
|
| 168 |
|
| 169 |
-
|
| 170 |
-
|
| 171 |
-
else:
|
| 172 |
-
mask = _segment_with_basic_prompts(image, predictor, fallback_enabled)
|
| 173 |
|
| 174 |
-
|
| 175 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 176 |
|
| 177 |
-
|
| 178 |
-
|
| 179 |
-
|
| 180 |
-
|
| 181 |
-
|
| 182 |
|
| 183 |
-
logger.
|
| 184 |
-
return
|
| 185 |
|
| 186 |
-
except SegmentationError:
|
| 187 |
-
raise
|
| 188 |
except Exception as e:
|
| 189 |
logger.error(f"Unexpected segmentation error: {e}")
|
| 190 |
if fallback_enabled:
|
| 191 |
-
return
|
| 192 |
-
|
|
|
|
| 193 |
|
| 194 |
def segment_person_hq_original(image: np.ndarray, predictor: Any, fallback_enabled: bool = True) -> np.ndarray:
|
| 195 |
"""Original version of person segmentation for rollback"""
|
|
@@ -197,89 +191,56 @@ def segment_person_hq_original(image: np.ndarray, predictor: Any, fallback_enabl
|
|
| 197 |
raise SegmentationError("Invalid input image")
|
| 198 |
|
| 199 |
try:
|
| 200 |
-
|
| 201 |
-
|
| 202 |
-
|
| 203 |
-
return _fallback_segmentation(image)
|
| 204 |
-
raise SegmentationError("SAM2 predictor not available")
|
| 205 |
-
|
| 206 |
-
if not hasattr(predictor, 'set_image') or not hasattr(predictor, 'predict'):
|
| 207 |
-
logger.warning("Predictor missing required methods, using fallback")
|
| 208 |
-
if fallback_enabled:
|
| 209 |
-
return _fallback_segmentation(image)
|
| 210 |
-
raise SegmentationError("Invalid predictor object")
|
| 211 |
-
|
| 212 |
-
try:
|
| 213 |
predictor.set_image(image)
|
| 214 |
-
except Exception as e:
|
| 215 |
-
logger.error(f"Failed to set image in predictor: {e}")
|
| 216 |
-
if fallback_enabled:
|
| 217 |
-
return _fallback_segmentation(image)
|
| 218 |
-
raise SegmentationError(f"Predictor setup failed: {e}")
|
| 219 |
|
| 220 |
-
|
| 221 |
-
|
| 222 |
-
|
| 223 |
-
|
| 224 |
-
|
| 225 |
-
|
| 226 |
-
|
| 227 |
-
|
| 228 |
-
|
| 229 |
-
|
| 230 |
-
[3*w//4, 2*h//3],
|
| 231 |
-
], dtype=np.float32)
|
| 232 |
|
| 233 |
-
|
| 234 |
|
| 235 |
-
try:
|
| 236 |
with torch.no_grad():
|
| 237 |
masks, scores, _ = predictor.predict(
|
| 238 |
point_coords=points,
|
| 239 |
point_labels=labels,
|
| 240 |
multimask_output=True
|
| 241 |
)
|
| 242 |
-
except Exception as e:
|
| 243 |
-
logger.error(f"SAM2 prediction failed: {e}")
|
| 244 |
-
if fallback_enabled:
|
| 245 |
-
return _fallback_segmentation(image)
|
| 246 |
-
raise SegmentationError(f"Prediction failed: {e}")
|
| 247 |
|
| 248 |
-
|
| 249 |
-
|
| 250 |
-
|
| 251 |
-
|
| 252 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 253 |
|
| 254 |
-
if
|
| 255 |
-
logger.warning("
|
| 256 |
-
|
| 257 |
else:
|
| 258 |
-
|
| 259 |
-
best_mask = masks[best_idx]
|
| 260 |
-
logger.debug(f"Selected mask {best_idx} with score {scores[best_idx]:.3f}")
|
| 261 |
-
|
| 262 |
-
mask = _process_mask(best_mask)
|
| 263 |
|
| 264 |
-
if not _validate_mask_quality(mask, image.shape[:2]):
|
| 265 |
-
logger.warning("Mask quality validation failed")
|
| 266 |
-
if fallback_enabled:
|
| 267 |
-
return _fallback_segmentation(image)
|
| 268 |
-
raise SegmentationError("Poor mask quality")
|
| 269 |
-
|
| 270 |
-
logger.debug(f"Segmentation successful - mask range: {mask.min()}-{mask.max()}")
|
| 271 |
-
return mask
|
| 272 |
-
|
| 273 |
-
except SegmentationError:
|
| 274 |
-
raise
|
| 275 |
except Exception as e:
|
| 276 |
logger.error(f"Unexpected segmentation error: {e}")
|
| 277 |
if fallback_enabled:
|
| 278 |
-
return
|
| 279 |
-
|
|
|
|
| 280 |
|
| 281 |
# ============================================================================
|
| 282 |
-
# MASK REFINEMENT
|
| 283 |
# ============================================================================
|
| 284 |
|
| 285 |
def refine_mask_hq(image: np.ndarray, mask: np.ndarray, matanyone_processor: Any,
|
|
@@ -291,6 +252,7 @@ def refine_mask_hq(image: np.ndarray, mask: np.ndarray, matanyone_processor: Any
|
|
| 291 |
try:
|
| 292 |
mask = _process_mask(mask)
|
| 293 |
|
|
|
|
| 294 |
if matanyone_processor is not None:
|
| 295 |
try:
|
| 296 |
logger.debug("Attempting MatAnyone refinement")
|
|
@@ -300,16 +262,43 @@ def refine_mask_hq(image: np.ndarray, mask: np.ndarray, matanyone_processor: Any
|
|
| 300 |
logger.debug("MatAnyone refinement successful")
|
| 301 |
return refined_mask
|
| 302 |
else:
|
| 303 |
-
logger.warning("
|
| 304 |
|
| 305 |
except Exception as e:
|
| 306 |
-
logger.warning(f"
|
| 307 |
|
| 308 |
-
|
|
|
|
| 309 |
logger.debug("Using enhanced OpenCV refinement")
|
| 310 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 311 |
|
| 312 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 313 |
|
| 314 |
except MaskRefinementError:
|
| 315 |
raise
|
|
@@ -317,7 +306,8 @@ def refine_mask_hq(image: np.ndarray, mask: np.ndarray, matanyone_processor: Any
|
|
| 317 |
logger.error(f"Unexpected mask refinement error: {e}")
|
| 318 |
if fallback_enabled:
|
| 319 |
return enhance_mask_opencv_advanced(image, mask)
|
| 320 |
-
|
|
|
|
| 321 |
|
| 322 |
def enhance_mask_opencv_advanced(image: np.ndarray, mask: np.ndarray) -> np.ndarray:
|
| 323 |
"""Advanced OpenCV-based mask enhancement with multiple techniques"""
|
|
@@ -338,6 +328,7 @@ def enhance_mask_opencv_advanced(image: np.ndarray, mask: np.ndarray) -> np.ndar
|
|
| 338 |
refined_mask = cv2.morphologyEx(refined_mask, cv2.MORPH_OPEN, kernel_open)
|
| 339 |
|
| 340 |
refined_mask = cv2.GaussianBlur(refined_mask, (3, 3), 0.8)
|
|
|
|
| 341 |
_, refined_mask = cv2.threshold(refined_mask, 127, 255, cv2.THRESH_BINARY)
|
| 342 |
|
| 343 |
return refined_mask
|
|
@@ -353,22 +344,23 @@ def enhance_mask_opencv_advanced(image: np.ndarray, mask: np.ndarray) -> np.ndar
|
|
| 353 |
def _matanyone_refine(image: np.ndarray, mask: np.ndarray, matanyone_processor: Any) -> Optional[np.ndarray]:
|
| 354 |
"""Safe MatAnyOne refinement for a single frame with correct interface."""
|
| 355 |
try:
|
|
|
|
| 356 |
if not hasattr(matanyone_processor, 'step') or not hasattr(matanyone_processor, 'output_prob_to_mask'):
|
| 357 |
logger.warning("MatAnyOne processor missing required methods (step, output_prob_to_mask)")
|
| 358 |
return None
|
| 359 |
|
| 360 |
-
# image
|
| 361 |
if isinstance(image, np.ndarray):
|
| 362 |
img = image.astype(np.float32)
|
| 363 |
if img.max() > 1.0:
|
| 364 |
img /= 255.0
|
| 365 |
if img.shape[2] == 3:
|
| 366 |
-
img = np.transpose(img, (2, 0, 1))
|
| 367 |
img_tensor = torch.from_numpy(img)
|
| 368 |
else:
|
| 369 |
-
img_tensor = image
|
| 370 |
|
| 371 |
-
# mask
|
| 372 |
if isinstance(mask, np.ndarray):
|
| 373 |
mask_tensor = mask.astype(np.float32)
|
| 374 |
if mask_tensor.max() > 1.0:
|
|
@@ -379,14 +371,20 @@ def _matanyone_refine(image: np.ndarray, mask: np.ndarray, matanyone_processor:
|
|
| 379 |
else:
|
| 380 |
mask_tensor = mask
|
| 381 |
|
|
|
|
| 382 |
device = getattr(matanyone_processor, 'device', 'cpu')
|
| 383 |
img_tensor = img_tensor.to(device)
|
| 384 |
mask_tensor = mask_tensor.to(device)
|
| 385 |
|
|
|
|
|
|
|
| 386 |
with torch.no_grad():
|
| 387 |
-
output_prob = matanyone_processor.step(img_tensor, mask_tensor, objects=
|
|
|
|
|
|
|
| 388 |
refined_mask_tensor = matanyone_processor.output_prob_to_mask(output_prob)
|
| 389 |
|
|
|
|
| 390 |
refined_mask = refined_mask_tensor.squeeze().detach().cpu().numpy()
|
| 391 |
if refined_mask.max() <= 1.0:
|
| 392 |
refined_mask = (refined_mask * 255).astype(np.uint8)
|
|
@@ -401,11 +399,11 @@ def _matanyone_refine(image: np.ndarray, mask: np.ndarray, matanyone_processor:
|
|
| 401 |
return None
|
| 402 |
|
| 403 |
# ============================================================================
|
| 404 |
-
# BACKGROUND REPLACEMENT
|
| 405 |
# ============================================================================
|
| 406 |
|
| 407 |
def replace_background_hq(frame: np.ndarray, mask: np.ndarray, background: np.ndarray,
|
| 408 |
-
|
| 409 |
"""Enhanced background replacement with comprehensive error handling"""
|
| 410 |
if frame is None or mask is None or background is None:
|
| 411 |
raise BackgroundReplacementError("Invalid input frame, mask, or background")
|
|
@@ -433,7 +431,8 @@ def replace_background_hq(frame: np.ndarray, mask: np.ndarray, background: np.nd
|
|
| 433 |
logger.warning(f"Advanced compositing failed: {e}")
|
| 434 |
if fallback_enabled:
|
| 435 |
return _simple_compositing(frame, mask, background)
|
| 436 |
-
|
|
|
|
| 437 |
|
| 438 |
except BackgroundReplacementError:
|
| 439 |
raise
|
|
@@ -441,62 +440,65 @@ def replace_background_hq(frame: np.ndarray, mask: np.ndarray, background: np.nd
|
|
| 441 |
logger.error(f"Unexpected background replacement error: {e}")
|
| 442 |
if fallback_enabled:
|
| 443 |
return _simple_compositing(frame, mask, background)
|
| 444 |
-
|
|
|
|
| 445 |
|
| 446 |
def create_professional_background(bg_config: Dict[str, Any] | str, width: int, height: int) -> np.ndarray:
|
| 447 |
"""
|
| 448 |
-
|
| 449 |
-
|
| 450 |
-
{'background_choice': 'minimalist', 'custom_path': '/path/to/image.jpg'}
|
| 451 |
-
(backwards compatible with older dicts that contained 'type'/'colors')
|
| 452 |
-
- If 'custom_path' exists, load and letterbox-fit it.
|
| 453 |
-
- Returns **BGR** (consistent with OpenCV).
|
| 454 |
"""
|
| 455 |
-
|
| 456 |
-
|
| 457 |
-
|
| 458 |
-
|
| 459 |
-
|
| 460 |
-
|
| 461 |
-
|
| 462 |
-
|
| 463 |
-
|
| 464 |
-
|
| 465 |
-
|
| 466 |
-
|
| 467 |
-
|
| 468 |
-
|
| 469 |
-
|
| 470 |
-
|
| 471 |
-
|
| 472 |
-
|
| 473 |
-
# 2) Old dict form with 'type'/'colors'
|
| 474 |
-
if "type" in bg_config and "colors" in bg_config:
|
| 475 |
-
if bg_config["type"] == "color":
|
| 476 |
-
background = _create_solid_background(bg_config, width, height) # already BGR
|
| 477 |
-
else:
|
| 478 |
-
background = _create_gradient_background_enhanced(bg_config, width, height) # returns BGR
|
| 479 |
-
return _apply_background_adjustments(background, bg_config)
|
| 480 |
|
| 481 |
-
|
| 482 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 483 |
|
| 484 |
-
|
| 485 |
-
|
| 486 |
-
if choice not in PROFESSIONAL_BACKGROUNDS:
|
| 487 |
-
choice = "minimalist"
|
| 488 |
-
cfg = PROFESSIONAL_BACKGROUNDS[choice]
|
| 489 |
|
| 490 |
-
|
| 491 |
-
|
| 492 |
-
|
| 493 |
-
|
|
|
|
| 494 |
|
| 495 |
-
|
| 496 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 497 |
|
| 498 |
# ============================================================================
|
| 499 |
-
# VALIDATION
|
| 500 |
# ============================================================================
|
| 501 |
|
| 502 |
def validate_video_file(video_path: str) -> Tuple[bool, str]:
|
|
@@ -551,10 +553,12 @@ def validate_video_file(video_path: str) -> Tuple[bool, str]:
|
|
| 551 |
def _segment_with_intelligent_prompts(image: np.ndarray, predictor: Any, fallback_enabled: bool = True) -> np.ndarray:
|
| 552 |
"""Intelligent automatic prompt generation for segmentation with safe predictor access"""
|
| 553 |
try:
|
|
|
|
| 554 |
if predictor is None or not hasattr(predictor, 'predict'):
|
| 555 |
if fallback_enabled:
|
| 556 |
-
return
|
| 557 |
-
|
|
|
|
| 558 |
|
| 559 |
h, w = image.shape[:2]
|
| 560 |
pos_points, neg_points = _generate_smart_prompts(image)
|
|
@@ -592,16 +596,19 @@ def _segment_with_intelligent_prompts(image: np.ndarray, predictor: Any, fallbac
|
|
| 592 |
except Exception as e:
|
| 593 |
logger.error(f"Intelligent prompting failed: {e}")
|
| 594 |
if fallback_enabled:
|
| 595 |
-
return
|
| 596 |
-
|
|
|
|
| 597 |
|
| 598 |
def _segment_with_basic_prompts(image: np.ndarray, predictor: Any, fallback_enabled: bool = True) -> np.ndarray:
|
| 599 |
"""Basic prompting method for segmentation with safe predictor access"""
|
| 600 |
try:
|
|
|
|
| 601 |
if predictor is None or not hasattr(predictor, 'predict'):
|
| 602 |
if fallback_enabled:
|
| 603 |
-
return
|
| 604 |
-
|
|
|
|
| 605 |
|
| 606 |
h, w = image.shape[:2]
|
| 607 |
|
|
@@ -639,57 +646,47 @@ def _segment_with_basic_prompts(image: np.ndarray, predictor: Any, fallback_enab
|
|
| 639 |
except Exception as e:
|
| 640 |
logger.error(f"Basic prompting failed: {e}")
|
| 641 |
if fallback_enabled:
|
| 642 |
-
return
|
| 643 |
-
|
|
|
|
| 644 |
|
| 645 |
def _generate_smart_prompts(image: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
| 646 |
"""Generate optimal positive/negative points automatically"""
|
| 647 |
try:
|
| 648 |
h, w = image.shape[:2]
|
| 649 |
|
| 650 |
-
|
| 651 |
-
|
| 652 |
-
|
| 653 |
-
|
| 654 |
-
|
| 655 |
-
|
| 656 |
-
|
| 657 |
-
|
| 658 |
-
|
| 659 |
-
|
| 660 |
-
|
| 661 |
-
|
| 662 |
-
|
| 663 |
-
|
| 664 |
-
|
| 665 |
-
|
| 666 |
-
|
| 667 |
-
positive_points.append([cx, cy])
|
| 668 |
-
|
| 669 |
-
if positive_points:
|
| 670 |
-
logger.debug(f"Generated {len(positive_points)} saliency-based points")
|
| 671 |
-
positive_points = np.array(positive_points, dtype=np.float32)
|
| 672 |
-
else:
|
| 673 |
-
raise Exception("No valid saliency points found")
|
| 674 |
-
|
| 675 |
-
except Exception as e:
|
| 676 |
-
logger.debug(f"Saliency method failed: {e}, using fallback")
|
| 677 |
-
positive_points = np.array([
|
| 678 |
[w//2, h//3],
|
| 679 |
[w//2, h//2],
|
| 680 |
[w//2, 2*h//3],
|
| 681 |
-
]
|
| 682 |
|
| 683 |
-
negative_points =
|
| 684 |
[10, 10],
|
| 685 |
[w-10, 10],
|
| 686 |
[10, h-10],
|
| 687 |
[w-10, h-10],
|
| 688 |
[w//2, 5],
|
| 689 |
[w//2, h-5],
|
| 690 |
-
]
|
| 691 |
|
| 692 |
-
return positive_points, negative_points
|
| 693 |
|
| 694 |
except Exception as e:
|
| 695 |
logger.warning(f"Smart prompt generation failed: {e}")
|
|
@@ -698,6 +695,146 @@ def _generate_smart_prompts(image: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
|
| 698 |
negative_points = np.array([[10, 10], [w-10, 10]], dtype=np.float32)
|
| 699 |
return positive_points, negative_points
|
| 700 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 701 |
# ============================================================================
|
| 702 |
# HELPER FUNCTIONS - REFINEMENT
|
| 703 |
# ============================================================================
|
|
@@ -887,44 +1024,43 @@ def _process_mask(mask: np.ndarray) -> np.ndarray:
|
|
| 887 |
|
| 888 |
except Exception as e:
|
| 889 |
logger.error(f"Mask processing failed: {e}")
|
| 890 |
-
h, w = mask.shape[:2] if len(mask.shape) >= 2 else (256, 256)
|
| 891 |
fallback = np.zeros((h, w), dtype=np.uint8)
|
| 892 |
fallback[h//4:3*h//4, w//4:3*w//4] = 255
|
| 893 |
return fallback
|
| 894 |
|
| 895 |
def _validate_mask_quality(mask: np.ndarray, image_shape: Tuple[int, int]) -> bool:
|
| 896 |
-
"""
|
| 897 |
-
Soft validator: only reject clearly broken masks.
|
| 898 |
-
- Accept area ratios roughly between 2% and 95%.
|
| 899 |
-
- Don't fail on center; just warn.
|
| 900 |
-
"""
|
| 901 |
try:
|
| 902 |
h, w = image_shape
|
| 903 |
-
|
| 904 |
-
|
| 905 |
-
area_ratio = mask_area / total_area
|
| 906 |
|
| 907 |
-
|
| 908 |
-
|
|
|
|
| 909 |
return False
|
| 910 |
|
| 911 |
-
|
| 912 |
-
|
| 913 |
-
|
|
|
|
|
|
|
| 914 |
return False
|
| 915 |
|
| 916 |
-
|
| 917 |
-
|
| 918 |
-
|
|
|
|
| 919 |
|
| 920 |
return True
|
| 921 |
|
| 922 |
except Exception as e:
|
| 923 |
-
logger.warning(f"Mask validation error
|
| 924 |
return True
|
| 925 |
|
| 926 |
def _fallback_segmentation(image: np.ndarray) -> np.ndarray:
|
| 927 |
-
"""
|
| 928 |
try:
|
| 929 |
logger.info("Using fallback segmentation strategy")
|
| 930 |
h, w = image.shape[:2]
|
|
@@ -951,15 +1087,8 @@ def _fallback_segmentation(image: np.ndarray) -> np.ndarray:
|
|
| 951 |
except Exception as e:
|
| 952 |
logger.warning(f"Background subtraction fallback failed: {e}")
|
| 953 |
|
| 954 |
-
|
| 955 |
-
|
| 956 |
-
center_x, center_y = w // 2, h // 2
|
| 957 |
-
radius_x, radius_y = w // 3, h // 2.5
|
| 958 |
-
|
| 959 |
-
y, x = np.ogrid[:h, :w]
|
| 960 |
-
mask_ellipse = ((x - center_x) / radius_x) ** 2 + ((y - center_y) / radius_y) ** 2 <= 1
|
| 961 |
-
mask[mask_ellipse] = 255
|
| 962 |
-
|
| 963 |
logger.info("Using geometric fallback mask")
|
| 964 |
return mask
|
| 965 |
|
|
@@ -1016,10 +1145,9 @@ def _advanced_compositing(frame: np.ndarray, mask: np.ndarray, background: np.nd
|
|
| 1016 |
mask_binary = cv2.morphologyEx(mask_binary, cv2.MORPH_CLOSE, kernel)
|
| 1017 |
mask_binary = cv2.morphologyEx(mask_binary, cv2.MORPH_OPEN, kernel)
|
| 1018 |
|
| 1019 |
-
mask_smooth = cv2.GaussianBlur(mask_binary.astype(np.float32), (5, 5), 1.0)
|
| 1020 |
-
mask_smooth = mask_smooth / 255.0
|
| 1021 |
-
|
| 1022 |
mask_smooth = np.power(mask_smooth, 0.8)
|
|
|
|
| 1023 |
mask_smooth = np.where(mask_smooth > 0.5,
|
| 1024 |
np.minimum(mask_smooth * 1.1, 1.0),
|
| 1025 |
mask_smooth * 0.9)
|
|
@@ -1098,14 +1226,14 @@ def _simple_compositing(frame: np.ndarray, mask: np.ndarray, background: np.ndar
|
|
| 1098 |
# ============================================================================
|
| 1099 |
|
| 1100 |
def _create_solid_background(bg_config: Dict[str, Any], width: int, height: int) -> np.ndarray:
|
| 1101 |
-
"""Create solid color background (
|
| 1102 |
color_hex = bg_config["colors"][0].lstrip('#')
|
| 1103 |
color_rgb = tuple(int(color_hex[i:i+2], 16) for i in (0, 2, 4))
|
| 1104 |
color_bgr = color_rgb[::-1]
|
| 1105 |
return np.full((height, width, 3), color_bgr, dtype=np.uint8)
|
| 1106 |
|
| 1107 |
def _create_gradient_background_enhanced(bg_config: Dict[str, Any], width: int, height: int) -> np.ndarray:
|
| 1108 |
-
"""Create enhanced gradient background with better quality (
|
| 1109 |
try:
|
| 1110 |
colors = bg_config["colors"]
|
| 1111 |
direction = bg_config.get("direction", "vertical")
|
|
@@ -1137,25 +1265,26 @@ def _create_gradient_background_enhanced(bg_config: Dict[str, Any], width: int,
|
|
| 1137 |
return np.full((height, width, 3), (128, 128, 128), dtype=np.uint8)
|
| 1138 |
|
| 1139 |
def _create_vertical_gradient(colors: list, width: int, height: int) -> np.ndarray:
|
|
|
|
| 1140 |
gradient = np.zeros((height, width, 3), dtype=np.uint8)
|
| 1141 |
for y in range(height):
|
| 1142 |
-
progress = y /
|
| 1143 |
-
|
| 1144 |
-
gradient[y, :] = color
|
| 1145 |
return gradient
|
| 1146 |
|
| 1147 |
def _create_horizontal_gradient(colors: list, width: int, height: int) -> np.ndarray:
|
|
|
|
| 1148 |
gradient = np.zeros((height, width, 3), dtype=np.uint8)
|
| 1149 |
for x in range(width):
|
| 1150 |
-
progress = x /
|
| 1151 |
-
|
| 1152 |
-
gradient[:, x] = color
|
| 1153 |
return gradient
|
| 1154 |
|
| 1155 |
def _create_diagonal_gradient(colors: list, width: int, height: int) -> np.ndarray:
|
|
|
|
| 1156 |
y_coords, x_coords = np.mgrid[0:height, 0:width]
|
| 1157 |
max_distance = width + height
|
| 1158 |
-
progress = (x_coords + y_coords) / max_distance
|
| 1159 |
progress = np.clip(progress, 0, 1)
|
| 1160 |
|
| 1161 |
gradient = np.zeros((height, width, 3), dtype=np.uint8)
|
|
@@ -1164,12 +1293,13 @@ def _create_diagonal_gradient(colors: list, width: int, height: int) -> np.ndarr
|
|
| 1164 |
return gradient
|
| 1165 |
|
| 1166 |
def _create_radial_gradient(colors: list, width: int, height: int, soft: bool = False) -> np.ndarray:
|
|
|
|
| 1167 |
center_x, center_y = width // 2, height // 2
|
| 1168 |
max_distance = np.sqrt(center_x**2 + center_y**2)
|
| 1169 |
|
| 1170 |
-
|
| 1171 |
-
distances = np.sqrt((
|
| 1172 |
-
progress = distances / max_distance
|
| 1173 |
progress = np.clip(progress, 0, 1)
|
| 1174 |
|
| 1175 |
if soft:
|
|
@@ -1178,9 +1308,11 @@ def _create_radial_gradient(colors: list, width: int, height: int, soft: bool =
|
|
| 1178 |
gradient = np.zeros((height, width, 3), dtype=np.uint8)
|
| 1179 |
for c in range(3):
|
| 1180 |
gradient[:, :, c] = _vectorized_color_interpolation(colors, progress, c)
|
|
|
|
| 1181 |
return gradient
|
| 1182 |
|
| 1183 |
def _vectorized_color_interpolation(colors: list, progress: np.ndarray, channel: int) -> np.ndarray:
|
|
|
|
| 1184 |
if len(colors) == 1:
|
| 1185 |
return np.full_like(progress, colors[0][channel], dtype=np.uint8)
|
| 1186 |
|
|
@@ -1200,6 +1332,7 @@ def _vectorized_color_interpolation(colors: list, progress: np.ndarray, channel:
|
|
| 1200 |
return np.clip(result, 0, 255).astype(np.uint8)
|
| 1201 |
|
| 1202 |
def _interpolate_color(colors: list, progress: float) -> tuple:
|
|
|
|
| 1203 |
if len(colors) == 1:
|
| 1204 |
return colors[0]
|
| 1205 |
elif len(colors) == 2:
|
|
@@ -1210,7 +1343,7 @@ def _interpolate_color(colors: list, progress: float) -> tuple:
|
|
| 1210 |
else:
|
| 1211 |
segment = progress * (len(colors) - 1)
|
| 1212 |
idx = int(segment)
|
| 1213 |
-
local_progress = segment - idx
|
| 1214 |
if idx >= len(colors) - 1:
|
| 1215 |
return colors[-1]
|
| 1216 |
c1, c2 = colors[idx], colors[idx + 1]
|
|
@@ -1220,6 +1353,7 @@ def _interpolate_color(colors: list, progress: float) -> tuple:
|
|
| 1220 |
return (r, g, b)
|
| 1221 |
|
| 1222 |
def _apply_background_adjustments(background: np.ndarray, bg_config: Dict[str, Any]) -> np.ndarray:
|
|
|
|
| 1223 |
try:
|
| 1224 |
brightness = bg_config.get("brightness", 1.0)
|
| 1225 |
contrast = bg_config.get("contrast", 1.0)
|
|
|
|
| 3 |
Contains segmentation, mask refinement, background replacement, and helper functions
|
| 4 |
"""
|
| 5 |
|
| 6 |
+
# ---- Early thread env (defensive) ----
|
| 7 |
import os
|
| 8 |
if 'OMP_NUM_THREADS' not in os.environ:
|
| 9 |
os.environ['OMP_NUM_THREADS'] = '4'
|
|
|
|
| 23 |
|
| 24 |
# Version control flags for CV functions
|
| 25 |
USE_ENHANCED_SEGMENTATION = True
|
| 26 |
+
USE_AUTO_TEMPORAL_CONSISTENCY = True # reserved for future temporal smoothing
|
| 27 |
USE_INTELLIGENT_PROMPTING = True
|
| 28 |
USE_ITERATIVE_REFINEMENT = True
|
| 29 |
|
| 30 |
+
# Validator thresholds (softened to avoid false negatives)
|
| 31 |
+
MIN_AREA_RATIO = 0.015 # 1.5% of frame
|
| 32 |
+
MAX_AREA_RATIO = 0.97 # 97% of frame
|
| 33 |
+
|
| 34 |
+
# GrabCut / saliency config
|
| 35 |
+
GRABCUT_ITERS = 3
|
| 36 |
+
SALIENCY_THRESH = 0.65
|
| 37 |
+
|
| 38 |
# Professional background templates
|
| 39 |
PROFESSIONAL_BACKGROUNDS = {
|
| 40 |
"office_modern": {
|
|
|
|
| 110 |
pass
|
| 111 |
|
| 112 |
# ============================================================================
|
| 113 |
+
# LETTERBOX FIT (RGB in, RGB out) for custom background images
|
| 114 |
# ============================================================================
|
| 115 |
|
| 116 |
def _fit_image_letterbox(img_rgb: np.ndarray, dst_w: int, dst_h: int, fill=(32, 32, 32)) -> np.ndarray:
|
|
|
|
|
|
|
|
|
|
|
|
|
| 117 |
h, w = img_rgb.shape[:2]
|
| 118 |
if h == 0 or w == 0:
|
| 119 |
return np.full((dst_h, dst_w, 3), fill, dtype=np.uint8)
|
|
|
|
| 140 |
# ============================================================================
|
| 141 |
|
| 142 |
def segment_person_hq(image: np.ndarray, predictor: Any, fallback_enabled: bool = True) -> np.ndarray:
|
| 143 |
+
"""High-quality person segmentation with intelligent automation and robust cascade"""
|
| 144 |
if not USE_ENHANCED_SEGMENTATION:
|
| 145 |
return segment_person_hq_original(image, predictor, fallback_enabled)
|
| 146 |
|
|
|
|
| 150 |
raise SegmentationError("Invalid input image")
|
| 151 |
|
| 152 |
try:
|
| 153 |
+
# 1) SAM2 (if available)
|
| 154 |
+
if predictor and hasattr(predictor, 'set_image') and hasattr(predictor, 'predict'):
|
| 155 |
+
try:
|
| 156 |
+
predictor.set_image(image)
|
| 157 |
+
if USE_INTELLIGENT_PROMPTING:
|
| 158 |
+
mask = _segment_with_intelligent_prompts(image, predictor, fallback_enabled=True)
|
| 159 |
+
else:
|
| 160 |
+
mask = _segment_with_basic_prompts(image, predictor, fallback_enabled=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 161 |
|
| 162 |
+
if USE_ITERATIVE_REFINEMENT and mask is not None:
|
| 163 |
+
mask = _auto_refine_mask_iteratively(image, mask, predictor)
|
|
|
|
|
|
|
| 164 |
|
| 165 |
+
if _validate_mask_quality(mask, image.shape[:2]):
|
| 166 |
+
logger.debug("SAM2 mask accepted by validator")
|
| 167 |
+
return mask
|
| 168 |
+
logger.warning("SAM2 mask failed validation; cascading to classical methods.")
|
| 169 |
+
except Exception as e:
|
| 170 |
+
logger.warning(f"SAM2 segmentation error: {e}")
|
| 171 |
|
| 172 |
+
# 2) Classical cascade when SAM2 is absent/weak
|
| 173 |
+
classical = _classical_segmentation_cascade(image)
|
| 174 |
+
if _validate_mask_quality(classical, image.shape[:2]):
|
| 175 |
+
logger.debug("Classical cascade mask accepted by validator")
|
| 176 |
+
return classical
|
| 177 |
|
| 178 |
+
logger.warning("Classical cascade produced weak mask; using geometric fallback.")
|
| 179 |
+
return _geometric_person_mask(image)
|
| 180 |
|
|
|
|
|
|
|
| 181 |
except Exception as e:
|
| 182 |
logger.error(f"Unexpected segmentation error: {e}")
|
| 183 |
if fallback_enabled:
|
| 184 |
+
return _geometric_person_mask(image)
|
| 185 |
+
else:
|
| 186 |
+
raise SegmentationError(f"Unexpected error: {e}")
|
| 187 |
|
| 188 |
def segment_person_hq_original(image: np.ndarray, predictor: Any, fallback_enabled: bool = True) -> np.ndarray:
|
| 189 |
"""Original version of person segmentation for rollback"""
|
|
|
|
| 191 |
raise SegmentationError("Invalid input image")
|
| 192 |
|
| 193 |
try:
|
| 194 |
+
# SAFE PREDICTOR CHECK
|
| 195 |
+
if predictor and hasattr(predictor, 'set_image') and hasattr(predictor, 'predict'):
|
| 196 |
+
h, w = image.shape[:2]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 197 |
predictor.set_image(image)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 198 |
|
| 199 |
+
points = np.array([
|
| 200 |
+
[w//2, h//4],
|
| 201 |
+
[w//2, h//2],
|
| 202 |
+
[w//2, 3*h//4],
|
| 203 |
+
[w//3, h//2],
|
| 204 |
+
[2*w//3, h//2],
|
| 205 |
+
[w//2, h//6],
|
| 206 |
+
[w//4, 2*h//3],
|
| 207 |
+
[3*w//4, 2*h//3],
|
| 208 |
+
], dtype=np.float32)
|
|
|
|
|
|
|
| 209 |
|
| 210 |
+
labels = np.ones(len(points), dtype=np.int32)
|
| 211 |
|
|
|
|
| 212 |
with torch.no_grad():
|
| 213 |
masks, scores, _ = predictor.predict(
|
| 214 |
point_coords=points,
|
| 215 |
point_labels=labels,
|
| 216 |
multimask_output=True
|
| 217 |
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 218 |
|
| 219 |
+
if masks is None or len(masks) == 0:
|
| 220 |
+
logger.warning("SAM2 returned no masks")
|
| 221 |
+
else:
|
| 222 |
+
best_idx = np.argmax(scores) if (scores is not None and len(scores) > 0) else 0
|
| 223 |
+
best_mask = masks[best_idx]
|
| 224 |
+
mask = _process_mask(best_mask)
|
| 225 |
+
if _validate_mask_quality(mask, image.shape[:2]):
|
| 226 |
+
logger.debug("Original SAM2 mask accepted by validator")
|
| 227 |
+
return mask
|
| 228 |
|
| 229 |
+
if fallback_enabled:
|
| 230 |
+
logger.warning("Falling back to classical segmentation")
|
| 231 |
+
return _classical_segmentation_cascade(image)
|
| 232 |
else:
|
| 233 |
+
raise SegmentationError("SAM2 failed and fallback disabled")
|
|
|
|
|
|
|
|
|
|
|
|
|
| 234 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 235 |
except Exception as e:
|
| 236 |
logger.error(f"Unexpected segmentation error: {e}")
|
| 237 |
if fallback_enabled:
|
| 238 |
+
return _classical_segmentation_cascade(image)
|
| 239 |
+
else:
|
| 240 |
+
raise SegmentationError(f"Unexpected error: {e}")
|
| 241 |
|
| 242 |
# ============================================================================
|
| 243 |
+
# MASK REFINEMENT FUNCTIONS
|
| 244 |
# ============================================================================
|
| 245 |
|
| 246 |
def refine_mask_hq(image: np.ndarray, mask: np.ndarray, matanyone_processor: Any,
|
|
|
|
| 252 |
try:
|
| 253 |
mask = _process_mask(mask)
|
| 254 |
|
| 255 |
+
# 1) MatAnyOne (if present)
|
| 256 |
if matanyone_processor is not None:
|
| 257 |
try:
|
| 258 |
logger.debug("Attempting MatAnyone refinement")
|
|
|
|
| 262 |
logger.debug("MatAnyone refinement successful")
|
| 263 |
return refined_mask
|
| 264 |
else:
|
| 265 |
+
logger.warning("MatAnyOne produced poor quality mask")
|
| 266 |
|
| 267 |
except Exception as e:
|
| 268 |
+
logger.warning(f"MatAnyOne refinement failed: {e}")
|
| 269 |
|
| 270 |
+
# 2) Advanced OpenCV refinement
|
| 271 |
+
try:
|
| 272 |
logger.debug("Using enhanced OpenCV refinement")
|
| 273 |
+
opencv_mask = enhance_mask_opencv_advanced(image, mask)
|
| 274 |
+
if _validate_mask_quality(opencv_mask, image.shape[:2]):
|
| 275 |
+
return opencv_mask
|
| 276 |
+
except Exception as e:
|
| 277 |
+
logger.warning(f"OpenCV advanced refinement failed: {e}")
|
| 278 |
+
|
| 279 |
+
# 3) GrabCut refinement (auto rect from saliency)
|
| 280 |
+
try:
|
| 281 |
+
logger.debug("Using GrabCut refinement fallback")
|
| 282 |
+
gc_mask = _refine_with_grabcut(image, mask)
|
| 283 |
+
if _validate_mask_quality(gc_mask, image.shape[:2]):
|
| 284 |
+
return gc_mask
|
| 285 |
+
except Exception as e:
|
| 286 |
+
logger.warning(f"GrabCut refinement failed: {e}")
|
| 287 |
|
| 288 |
+
# 4) Saliency flood-fill refinement
|
| 289 |
+
try:
|
| 290 |
+
logger.debug("Using saliency refinement fallback")
|
| 291 |
+
sal_mask = _refine_with_saliency(image, mask)
|
| 292 |
+
if _validate_mask_quality(sal_mask, image.shape[:2]):
|
| 293 |
+
return sal_mask
|
| 294 |
+
except Exception as e:
|
| 295 |
+
logger.warning(f"Saliency refinement failed: {e}")
|
| 296 |
+
|
| 297 |
+
if fallback_enabled:
|
| 298 |
+
logger.debug("Returning original mask after failed refinements")
|
| 299 |
+
return mask
|
| 300 |
+
else:
|
| 301 |
+
raise MaskRefinementError("All refinements failed")
|
| 302 |
|
| 303 |
except MaskRefinementError:
|
| 304 |
raise
|
|
|
|
| 306 |
logger.error(f"Unexpected mask refinement error: {e}")
|
| 307 |
if fallback_enabled:
|
| 308 |
return enhance_mask_opencv_advanced(image, mask)
|
| 309 |
+
else:
|
| 310 |
+
raise MaskRefinementError(f"Unexpected error: {e}")
|
| 311 |
|
| 312 |
def enhance_mask_opencv_advanced(image: np.ndarray, mask: np.ndarray) -> np.ndarray:
|
| 313 |
"""Advanced OpenCV-based mask enhancement with multiple techniques"""
|
|
|
|
| 328 |
refined_mask = cv2.morphologyEx(refined_mask, cv2.MORPH_OPEN, kernel_open)
|
| 329 |
|
| 330 |
refined_mask = cv2.GaussianBlur(refined_mask, (3, 3), 0.8)
|
| 331 |
+
|
| 332 |
_, refined_mask = cv2.threshold(refined_mask, 127, 255, cv2.THRESH_BINARY)
|
| 333 |
|
| 334 |
return refined_mask
|
|
|
|
| 344 |
def _matanyone_refine(image: np.ndarray, mask: np.ndarray, matanyone_processor: Any) -> Optional[np.ndarray]:
|
| 345 |
"""Safe MatAnyOne refinement for a single frame with correct interface."""
|
| 346 |
try:
|
| 347 |
+
# Check for correct MatAnyOne interface
|
| 348 |
if not hasattr(matanyone_processor, 'step') or not hasattr(matanyone_processor, 'output_prob_to_mask'):
|
| 349 |
logger.warning("MatAnyOne processor missing required methods (step, output_prob_to_mask)")
|
| 350 |
return None
|
| 351 |
|
| 352 |
+
# Preprocess image: ensure float32, RGB, (C, H, W)
|
| 353 |
if isinstance(image, np.ndarray):
|
| 354 |
img = image.astype(np.float32)
|
| 355 |
if img.max() > 1.0:
|
| 356 |
img /= 255.0
|
| 357 |
if img.shape[2] == 3:
|
| 358 |
+
img = np.transpose(img, (2, 0, 1)) # (H, W, C) → (C, H, W)
|
| 359 |
img_tensor = torch.from_numpy(img)
|
| 360 |
else:
|
| 361 |
+
img_tensor = image # assume already tensor
|
| 362 |
|
| 363 |
+
# Preprocess mask: ensure float32, (H, W)
|
| 364 |
if isinstance(mask, np.ndarray):
|
| 365 |
mask_tensor = mask.astype(np.float32)
|
| 366 |
if mask_tensor.max() > 1.0:
|
|
|
|
| 371 |
else:
|
| 372 |
mask_tensor = mask
|
| 373 |
|
| 374 |
+
# Move tensors to processor's device if available
|
| 375 |
device = getattr(matanyone_processor, 'device', 'cpu')
|
| 376 |
img_tensor = img_tensor.to(device)
|
| 377 |
mask_tensor = mask_tensor.to(device)
|
| 378 |
|
| 379 |
+
# Step: encode mask on this frame
|
| 380 |
+
objects = [1] # single object id
|
| 381 |
with torch.no_grad():
|
| 382 |
+
output_prob = matanyone_processor.step(img_tensor, mask_tensor, objects=objects)
|
| 383 |
+
# MatAnyOne returns output_prob as tensor
|
| 384 |
+
|
| 385 |
refined_mask_tensor = matanyone_processor.output_prob_to_mask(output_prob)
|
| 386 |
|
| 387 |
+
# Convert to numpy and to uint8
|
| 388 |
refined_mask = refined_mask_tensor.squeeze().detach().cpu().numpy()
|
| 389 |
if refined_mask.max() <= 1.0:
|
| 390 |
refined_mask = (refined_mask * 255).astype(np.uint8)
|
|
|
|
| 399 |
return None
|
| 400 |
|
| 401 |
# ============================================================================
|
| 402 |
+
# BACKGROUND REPLACEMENT FUNCTIONS
|
| 403 |
# ============================================================================
|
| 404 |
|
| 405 |
def replace_background_hq(frame: np.ndarray, mask: np.ndarray, background: np.ndarray,
|
| 406 |
+
fallback_enabled: bool = True) -> np.ndarray:
|
| 407 |
"""Enhanced background replacement with comprehensive error handling"""
|
| 408 |
if frame is None or mask is None or background is None:
|
| 409 |
raise BackgroundReplacementError("Invalid input frame, mask, or background")
|
|
|
|
| 431 |
logger.warning(f"Advanced compositing failed: {e}")
|
| 432 |
if fallback_enabled:
|
| 433 |
return _simple_compositing(frame, mask, background)
|
| 434 |
+
else:
|
| 435 |
+
raise BackgroundReplacementError(f"Advanced compositing failed: {e}")
|
| 436 |
|
| 437 |
except BackgroundReplacementError:
|
| 438 |
raise
|
|
|
|
| 440 |
logger.error(f"Unexpected background replacement error: {e}")
|
| 441 |
if fallback_enabled:
|
| 442 |
return _simple_compositing(frame, mask, background)
|
| 443 |
+
else:
|
| 444 |
+
raise BackgroundReplacementError(f"Unexpected error: {e}")
|
| 445 |
|
| 446 |
def create_professional_background(bg_config: Dict[str, Any] | str, width: int, height: int) -> np.ndarray:
|
| 447 |
"""
|
| 448 |
+
Enhanced professional background creation with quality improvements.
|
| 449 |
+
Accepts style string or dict (can include custom_path). Returns BGR (OpenCV).
|
|
|
|
|
|
|
|
|
|
|
|
|
| 450 |
"""
|
| 451 |
+
try:
|
| 452 |
+
choice = "minimalist"
|
| 453 |
+
custom_path = None
|
| 454 |
+
|
| 455 |
+
if isinstance(bg_config, dict):
|
| 456 |
+
choice = bg_config.get("background_choice", bg_config.get("name", "minimalist"))
|
| 457 |
+
custom_path = bg_config.get("custom_path")
|
| 458 |
+
|
| 459 |
+
# Custom background path (letterboxed + BGR out)
|
| 460 |
+
if custom_path and os.path.exists(custom_path):
|
| 461 |
+
img_bgr = cv2.imread(custom_path, cv2.IMREAD_COLOR)
|
| 462 |
+
if img_bgr is not None:
|
| 463 |
+
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
|
| 464 |
+
fitted_rgb = _fit_image_letterbox(img_rgb, width, height, fill=(32, 32, 32))
|
| 465 |
+
fitted_bgr = cv2.cvtColor(fitted_rgb, cv2.COLOR_RGB2BGR)
|
| 466 |
+
return fitted_bgr
|
| 467 |
+
else:
|
| 468 |
+
logger.warning(f"Failed to read custom background at {custom_path}. Falling back to style.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 469 |
|
| 470 |
+
# Direct dict colors/type form support
|
| 471 |
+
if "type" in bg_config and "colors" in bg_config:
|
| 472 |
+
if bg_config["type"] == "color":
|
| 473 |
+
background = _create_solid_background(bg_config, width, height)
|
| 474 |
+
else:
|
| 475 |
+
background = _create_gradient_background_enhanced(bg_config, width, height)
|
| 476 |
+
background = _apply_background_adjustments(background, bg_config)
|
| 477 |
+
return background
|
| 478 |
|
| 479 |
+
elif isinstance(bg_config, str):
|
| 480 |
+
choice = bg_config
|
|
|
|
|
|
|
|
|
|
| 481 |
|
| 482 |
+
choice = (choice or "minimalist").lower()
|
| 483 |
+
if choice not in PROFESSIONAL_BACKGROUNDS:
|
| 484 |
+
choice = "minimalist"
|
| 485 |
+
|
| 486 |
+
cfg = PROFESSIONAL_BACKGROUNDS[choice]
|
| 487 |
|
| 488 |
+
if cfg.get("type") == "color":
|
| 489 |
+
background = _create_solid_background(cfg, width, height)
|
| 490 |
+
else:
|
| 491 |
+
background = _create_gradient_background_enhanced(cfg, width, height)
|
| 492 |
+
|
| 493 |
+
background = _apply_background_adjustments(background, cfg)
|
| 494 |
+
return background
|
| 495 |
+
|
| 496 |
+
except Exception as e:
|
| 497 |
+
logger.error(f"Background creation error: {e}")
|
| 498 |
+
return np.full((height, width, 3), (128, 128, 128), dtype=np.uint8)
|
| 499 |
|
| 500 |
# ============================================================================
|
| 501 |
+
# VALIDATION FUNCTION
|
| 502 |
# ============================================================================
|
| 503 |
|
| 504 |
def validate_video_file(video_path: str) -> Tuple[bool, str]:
|
|
|
|
| 553 |
def _segment_with_intelligent_prompts(image: np.ndarray, predictor: Any, fallback_enabled: bool = True) -> np.ndarray:
|
| 554 |
"""Intelligent automatic prompt generation for segmentation with safe predictor access"""
|
| 555 |
try:
|
| 556 |
+
# Double-check predictor validity
|
| 557 |
if predictor is None or not hasattr(predictor, 'predict'):
|
| 558 |
if fallback_enabled:
|
| 559 |
+
return _classical_segmentation_cascade(image)
|
| 560 |
+
else:
|
| 561 |
+
raise SegmentationError("Invalid predictor in intelligent prompts")
|
| 562 |
|
| 563 |
h, w = image.shape[:2]
|
| 564 |
pos_points, neg_points = _generate_smart_prompts(image)
|
|
|
|
| 596 |
except Exception as e:
|
| 597 |
logger.error(f"Intelligent prompting failed: {e}")
|
| 598 |
if fallback_enabled:
|
| 599 |
+
return _classical_segmentation_cascade(image)
|
| 600 |
+
else:
|
| 601 |
+
raise
|
| 602 |
|
| 603 |
def _segment_with_basic_prompts(image: np.ndarray, predictor: Any, fallback_enabled: bool = True) -> np.ndarray:
|
| 604 |
"""Basic prompting method for segmentation with safe predictor access"""
|
| 605 |
try:
|
| 606 |
+
# Double-check predictor validity
|
| 607 |
if predictor is None or not hasattr(predictor, 'predict'):
|
| 608 |
if fallback_enabled:
|
| 609 |
+
return _classical_segmentation_cascade(image)
|
| 610 |
+
else:
|
| 611 |
+
raise SegmentationError("Invalid predictor in basic prompts")
|
| 612 |
|
| 613 |
h, w = image.shape[:2]
|
| 614 |
|
|
|
|
| 646 |
except Exception as e:
|
| 647 |
logger.error(f"Basic prompting failed: {e}")
|
| 648 |
if fallback_enabled:
|
| 649 |
+
return _classical_segmentation_cascade(image)
|
| 650 |
+
else:
|
| 651 |
+
raise
|
| 652 |
|
| 653 |
def _generate_smart_prompts(image: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
| 654 |
"""Generate optimal positive/negative points automatically"""
|
| 655 |
try:
|
| 656 |
h, w = image.shape[:2]
|
| 657 |
|
| 658 |
+
saliency = _compute_saliency(image)
|
| 659 |
+
positive_points = []
|
| 660 |
+
if saliency is not None:
|
| 661 |
+
saliency_thresh = (saliency > (SALIENCY_THRESH - 0.1)).astype(np.uint8) * 255
|
| 662 |
+
contours, _ = cv2.findContours(saliency_thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
| 663 |
+
|
| 664 |
+
if contours:
|
| 665 |
+
for contour in sorted(contours, key=cv2.contourArea, reverse=True)[:3]:
|
| 666 |
+
M = cv2.moments(contour)
|
| 667 |
+
if M["m00"] != 0:
|
| 668 |
+
cx = int(M["m10"] / M["m00"])
|
| 669 |
+
cy = int(M["m01"] / M["m00"])
|
| 670 |
+
if 0 < cx < w and 0 < cy < h:
|
| 671 |
+
positive_points.append([cx, cy])
|
| 672 |
+
|
| 673 |
+
if not positive_points:
|
| 674 |
+
positive_points = [
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 675 |
[w//2, h//3],
|
| 676 |
[w//2, h//2],
|
| 677 |
[w//2, 2*h//3],
|
| 678 |
+
]
|
| 679 |
|
| 680 |
+
negative_points = [
|
| 681 |
[10, 10],
|
| 682 |
[w-10, 10],
|
| 683 |
[10, h-10],
|
| 684 |
[w-10, h-10],
|
| 685 |
[w//2, 5],
|
| 686 |
[w//2, h-5],
|
| 687 |
+
]
|
| 688 |
|
| 689 |
+
return np.array(positive_points, dtype=np.float32), np.array(negative_points, dtype=np.float32)
|
| 690 |
|
| 691 |
except Exception as e:
|
| 692 |
logger.warning(f"Smart prompt generation failed: {e}")
|
|
|
|
| 695 |
negative_points = np.array([[10, 10], [w-10, 10]], dtype=np.float32)
|
| 696 |
return positive_points, negative_points
|
| 697 |
|
| 698 |
+
# ============================================================================
|
| 699 |
+
# CLASSICAL SEGMENTATION CASCADE
|
| 700 |
+
# ============================================================================
|
| 701 |
+
|
| 702 |
+
def _classical_segmentation_cascade(image: np.ndarray) -> np.ndarray:
|
| 703 |
+
"""
|
| 704 |
+
Robust non-AI cascade:
|
| 705 |
+
1) Background subtraction via edge-median
|
| 706 |
+
2) Saliency flood-fill
|
| 707 |
+
3) GrabCut from auto-rect
|
| 708 |
+
4) Geometric ellipse (final fallback)
|
| 709 |
+
"""
|
| 710 |
+
# 1) Background subtraction
|
| 711 |
+
try:
|
| 712 |
+
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
| 713 |
+
|
| 714 |
+
edge_pixels = np.concatenate([
|
| 715 |
+
gray[0, :], gray[-1, :], gray[:, 0], gray[:, -1]
|
| 716 |
+
])
|
| 717 |
+
bg_color = np.median(edge_pixels)
|
| 718 |
+
|
| 719 |
+
diff = np.abs(gray.astype(float) - bg_color)
|
| 720 |
+
mask = (diff > 30).astype(np.uint8) * 255
|
| 721 |
+
|
| 722 |
+
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (7, 7)))
|
| 723 |
+
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)))
|
| 724 |
+
|
| 725 |
+
if _validate_mask_quality(mask, image.shape[:2]):
|
| 726 |
+
logger.info("Background subtraction fallback successful")
|
| 727 |
+
return mask
|
| 728 |
+
|
| 729 |
+
except Exception as e:
|
| 730 |
+
logger.debug(f"Background subtraction fallback failed: {e}")
|
| 731 |
+
|
| 732 |
+
# 2) Saliency flood-fill refinement
|
| 733 |
+
try:
|
| 734 |
+
sal_ref = _refine_with_saliency(image, mask if 'mask' in locals() else np.zeros(image.shape[:2], np.uint8))
|
| 735 |
+
if _validate_mask_quality(sal_ref, image.shape[:2]):
|
| 736 |
+
return sal_ref
|
| 737 |
+
except Exception as e:
|
| 738 |
+
logger.debug(f"Saliency cascade failed: {e}")
|
| 739 |
+
|
| 740 |
+
# 3) GrabCut refinement
|
| 741 |
+
try:
|
| 742 |
+
gc_mask = _refine_with_grabcut(image, mask if 'mask' in locals() else np.zeros(image.shape[:2], np.uint8))
|
| 743 |
+
if _validate_mask_quality(gc_mask, image.shape[:2]):
|
| 744 |
+
return gc_mask
|
| 745 |
+
except Exception as e:
|
| 746 |
+
logger.debug(f"GrabCut cascade failed: {e}")
|
| 747 |
+
|
| 748 |
+
# 4) Geometric final fallback
|
| 749 |
+
logger.info("Using geometric fallback mask")
|
| 750 |
+
return _geometric_person_mask(image)
|
| 751 |
+
|
| 752 |
+
# ============================================================================
|
| 753 |
+
# SALIENCY / GRABCUT HELPERS
|
| 754 |
+
# ============================================================================
|
| 755 |
+
|
| 756 |
+
def _compute_saliency(image: np.ndarray) -> Optional[np.ndarray]:
|
| 757 |
+
try:
|
| 758 |
+
if hasattr(cv2, "saliency"):
|
| 759 |
+
sal = cv2.saliency.StaticSaliencySpectralResidual_create()
|
| 760 |
+
ok, smap = sal.computeSaliency(image)
|
| 761 |
+
if ok:
|
| 762 |
+
smap = (smap - smap.min()) / max(1e-6, (smap.max() - smap.min()))
|
| 763 |
+
return smap
|
| 764 |
+
except Exception:
|
| 765 |
+
pass
|
| 766 |
+
# Fallback spectral-ish hint using DCT trick
|
| 767 |
+
try:
|
| 768 |
+
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY).astype(np.float32) / 255.0
|
| 769 |
+
log = np.log(gray + 1e-6)
|
| 770 |
+
dct = cv2.dct(log)
|
| 771 |
+
dct[:5, :5] = 0
|
| 772 |
+
recon = cv2.idct(dct)
|
| 773 |
+
recon = (recon - recon.min()) / max(1e-6, (recon.max() - recon.min()))
|
| 774 |
+
return recon
|
| 775 |
+
except Exception:
|
| 776 |
+
return None
|
| 777 |
+
|
| 778 |
+
def _auto_person_rect(image: np.ndarray) -> Optional[Tuple[int, int, int, int]]:
|
| 779 |
+
sal = _compute_saliency(image)
|
| 780 |
+
if sal is None:
|
| 781 |
+
return None
|
| 782 |
+
th = (sal > SALIENCY_THRESH).astype(np.uint8) * 255
|
| 783 |
+
contours, _ = cv2.findContours(th, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
| 784 |
+
if not contours:
|
| 785 |
+
return None
|
| 786 |
+
c = max(contours, key=cv2.contourArea)
|
| 787 |
+
x, y, w, h = cv2.boundingRect(c)
|
| 788 |
+
# Inflate
|
| 789 |
+
pad_x, pad_y = int(0.05*w), int(0.05*h)
|
| 790 |
+
H, W = image.shape[:2]
|
| 791 |
+
x = max(0, x - pad_x); y = max(0, y - pad_y)
|
| 792 |
+
w = min(W - x, w + 2*pad_x); h = min(H - y, h + 2*pad_y)
|
| 793 |
+
return (x, y, w, h)
|
| 794 |
+
|
| 795 |
+
def _refine_with_grabcut(image: np.ndarray, seed_mask: np.ndarray) -> np.ndarray:
|
| 796 |
+
h, w = image.shape[:2]
|
| 797 |
+
gc_mask = np.full((h, w), cv2.GC_PR_BGD, dtype=np.uint8)
|
| 798 |
+
sure_fg = (seed_mask > 200)
|
| 799 |
+
gc_mask[sure_fg] = cv2.GC_FGD
|
| 800 |
+
|
| 801 |
+
rect = _auto_person_rect(image)
|
| 802 |
+
if rect is not None:
|
| 803 |
+
x, y, rw, rh = rect
|
| 804 |
+
else:
|
| 805 |
+
rw, rh = int(w * 0.5), int(h * 0.7)
|
| 806 |
+
x, y = (w - rw)//2, int(h*0.15)
|
| 807 |
+
|
| 808 |
+
bgdModel = np.zeros((1, 65), np.float64)
|
| 809 |
+
fgdModel = np.zeros((1, 65), np.float64)
|
| 810 |
+
|
| 811 |
+
cv2.grabCut(image, gc_mask, (x, y, rw, rh), bgdModel, fgdModel, GRABCUT_ITERS, cv2.GC_INIT_WITH_MASK)
|
| 812 |
+
|
| 813 |
+
mask_bin = np.where((gc_mask == cv2.GC_FGD) | (gc_mask == cv2.GC_PR_FGD), 255, 0).astype(np.uint8)
|
| 814 |
+
mask_bin = cv2.morphologyEx(mask_bin, cv2.MORPH_CLOSE, np.ones((3, 3), np.uint8))
|
| 815 |
+
return mask_bin
|
| 816 |
+
|
| 817 |
+
def _refine_with_saliency(image: np.ndarray, seed_mask: np.ndarray) -> np.ndarray:
|
| 818 |
+
sal = _compute_saliency(image)
|
| 819 |
+
if sal is None:
|
| 820 |
+
return seed_mask
|
| 821 |
+
th = (sal > SALIENCY_THRESH).astype(np.uint8) * 255
|
| 822 |
+
|
| 823 |
+
# Anchor from seed center mass or center fallback
|
| 824 |
+
ys, xs = np.where(seed_mask > 127)
|
| 825 |
+
if len(ys) > 0:
|
| 826 |
+
cx, cy = int(np.mean(xs)), int(np.mean(ys))
|
| 827 |
+
else:
|
| 828 |
+
h, w = image.shape[:2]
|
| 829 |
+
cx, cy = w//2, h//2
|
| 830 |
+
|
| 831 |
+
ff = th.copy()
|
| 832 |
+
h, w = th.shape
|
| 833 |
+
mask = np.zeros((h+2, w+2), np.uint8)
|
| 834 |
+
cv2.floodFill(ff, mask, (cx, cy), 255, loDiff=5, upDiff=5, flags=4)
|
| 835 |
+
ff = cv2.morphologyEx(ff, cv2.MORPH_CLOSE, np.ones((5,5), np.uint8))
|
| 836 |
+
return ff
|
| 837 |
+
|
| 838 |
# ============================================================================
|
| 839 |
# HELPER FUNCTIONS - REFINEMENT
|
| 840 |
# ============================================================================
|
|
|
|
| 1024 |
|
| 1025 |
except Exception as e:
|
| 1026 |
logger.error(f"Mask processing failed: {e}")
|
| 1027 |
+
h, w = mask.shape[:2] if (mask is not None and hasattr(mask, 'shape') and len(mask.shape) >= 2) else (256, 256)
|
| 1028 |
fallback = np.zeros((h, w), dtype=np.uint8)
|
| 1029 |
fallback[h//4:3*h//4, w//4:3*w//4] = 255
|
| 1030 |
return fallback
|
| 1031 |
|
| 1032 |
def _validate_mask_quality(mask: np.ndarray, image_shape: Tuple[int, int]) -> bool:
|
| 1033 |
+
"""Validate that the mask meets quality criteria (soft reject policy)"""
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1034 |
try:
|
| 1035 |
h, w = image_shape
|
| 1036 |
+
mask_area = np.sum(mask > 127)
|
| 1037 |
+
total_area = h * w
|
|
|
|
| 1038 |
|
| 1039 |
+
area_ratio = mask_area / total_area
|
| 1040 |
+
if area_ratio < MIN_AREA_RATIO or area_ratio > MAX_AREA_RATIO:
|
| 1041 |
+
logger.warning(f"Suspicious mask area ratio: {area_ratio:.3f}")
|
| 1042 |
return False
|
| 1043 |
|
| 1044 |
+
mask_binary = mask > 127
|
| 1045 |
+
mask_center_y, mask_center_x = np.where(mask_binary)
|
| 1046 |
+
|
| 1047 |
+
if len(mask_center_y) == 0:
|
| 1048 |
+
logger.warning("Empty mask")
|
| 1049 |
return False
|
| 1050 |
|
| 1051 |
+
center_y = np.mean(mask_center_y)
|
| 1052 |
+
# Advisory only (we no longer hard-reject based on center)
|
| 1053 |
+
if center_y < h * 0.08 or center_y > h * 0.98:
|
| 1054 |
+
logger.warning(f"Mask center unusual (advisory): y={center_y/h:.2f}")
|
| 1055 |
|
| 1056 |
return True
|
| 1057 |
|
| 1058 |
except Exception as e:
|
| 1059 |
+
logger.warning(f"Mask validation error: {e}")
|
| 1060 |
return True
|
| 1061 |
|
| 1062 |
def _fallback_segmentation(image: np.ndarray) -> np.ndarray:
|
| 1063 |
+
"""Legacy fallback segmentation; prefer _classical_segmentation_cascade"""
|
| 1064 |
try:
|
| 1065 |
logger.info("Using fallback segmentation strategy")
|
| 1066 |
h, w = image.shape[:2]
|
|
|
|
| 1087 |
except Exception as e:
|
| 1088 |
logger.warning(f"Background subtraction fallback failed: {e}")
|
| 1089 |
|
| 1090 |
+
# Geometric ellipse fallback
|
| 1091 |
+
mask = _geometric_person_mask(image)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1092 |
logger.info("Using geometric fallback mask")
|
| 1093 |
return mask
|
| 1094 |
|
|
|
|
| 1145 |
mask_binary = cv2.morphologyEx(mask_binary, cv2.MORPH_CLOSE, kernel)
|
| 1146 |
mask_binary = cv2.morphologyEx(mask_binary, cv2.MORPH_OPEN, kernel)
|
| 1147 |
|
| 1148 |
+
mask_smooth = cv2.GaussianBlur(mask_binary.astype(np.float32), (5, 5), 1.0) / 255.0
|
|
|
|
|
|
|
| 1149 |
mask_smooth = np.power(mask_smooth, 0.8)
|
| 1150 |
+
|
| 1151 |
mask_smooth = np.where(mask_smooth > 0.5,
|
| 1152 |
np.minimum(mask_smooth * 1.1, 1.0),
|
| 1153 |
mask_smooth * 0.9)
|
|
|
|
| 1226 |
# ============================================================================
|
| 1227 |
|
| 1228 |
def _create_solid_background(bg_config: Dict[str, Any], width: int, height: int) -> np.ndarray:
|
| 1229 |
+
"""Create solid color background (BGR)"""
|
| 1230 |
color_hex = bg_config["colors"][0].lstrip('#')
|
| 1231 |
color_rgb = tuple(int(color_hex[i:i+2], 16) for i in (0, 2, 4))
|
| 1232 |
color_bgr = color_rgb[::-1]
|
| 1233 |
return np.full((height, width, 3), color_bgr, dtype=np.uint8)
|
| 1234 |
|
| 1235 |
def _create_gradient_background_enhanced(bg_config: Dict[str, Any], width: int, height: int) -> np.ndarray:
|
| 1236 |
+
"""Create enhanced gradient background with better quality (BGR out)"""
|
| 1237 |
try:
|
| 1238 |
colors = bg_config["colors"]
|
| 1239 |
direction = bg_config.get("direction", "vertical")
|
|
|
|
| 1265 |
return np.full((height, width, 3), (128, 128, 128), dtype=np.uint8)
|
| 1266 |
|
| 1267 |
def _create_vertical_gradient(colors: list, width: int, height: int) -> np.ndarray:
|
| 1268 |
+
"""Create vertical gradient using NumPy for performance (RGB)"""
|
| 1269 |
gradient = np.zeros((height, width, 3), dtype=np.uint8)
|
| 1270 |
for y in range(height):
|
| 1271 |
+
progress = y / max(1, height)
|
| 1272 |
+
gradient[y, :] = _interpolate_color(colors, progress)
|
|
|
|
| 1273 |
return gradient
|
| 1274 |
|
| 1275 |
def _create_horizontal_gradient(colors: list, width: int, height: int) -> np.ndarray:
|
| 1276 |
+
"""Create horizontal gradient using NumPy for performance (RGB)"""
|
| 1277 |
gradient = np.zeros((height, width, 3), dtype=np.uint8)
|
| 1278 |
for x in range(width):
|
| 1279 |
+
progress = x / max(1, width)
|
| 1280 |
+
gradient[:, x] = _interpolate_color(colors, progress)
|
|
|
|
| 1281 |
return gradient
|
| 1282 |
|
| 1283 |
def _create_diagonal_gradient(colors: list, width: int, height: int) -> np.ndarray:
|
| 1284 |
+
"""Create diagonal gradient using vectorized operations (RGB)"""
|
| 1285 |
y_coords, x_coords = np.mgrid[0:height, 0:width]
|
| 1286 |
max_distance = width + height
|
| 1287 |
+
progress = (x_coords + y_coords) / max(1, max_distance)
|
| 1288 |
progress = np.clip(progress, 0, 1)
|
| 1289 |
|
| 1290 |
gradient = np.zeros((height, width, 3), dtype=np.uint8)
|
|
|
|
| 1293 |
return gradient
|
| 1294 |
|
| 1295 |
def _create_radial_gradient(colors: list, width: int, height: int, soft: bool = False) -> np.ndarray:
|
| 1296 |
+
"""Create radial gradient using vectorized operations (RGB)"""
|
| 1297 |
center_x, center_y = width // 2, height // 2
|
| 1298 |
max_distance = np.sqrt(center_x**2 + center_y**2)
|
| 1299 |
|
| 1300 |
+
y_coords, x_coords = np.mgrid[0:height, 0:width]
|
| 1301 |
+
distances = np.sqrt((x_coords - center_x)**2 + (y_coords - center_y)**2)
|
| 1302 |
+
progress = distances / max(1e-6, max_distance)
|
| 1303 |
progress = np.clip(progress, 0, 1)
|
| 1304 |
|
| 1305 |
if soft:
|
|
|
|
| 1308 |
gradient = np.zeros((height, width, 3), dtype=np.uint8)
|
| 1309 |
for c in range(3):
|
| 1310 |
gradient[:, :, c] = _vectorized_color_interpolation(colors, progress, c)
|
| 1311 |
+
|
| 1312 |
return gradient
|
| 1313 |
|
| 1314 |
def _vectorized_color_interpolation(colors: list, progress: np.ndarray, channel: int) -> np.ndarray:
|
| 1315 |
+
"""Vectorized color interpolation for performance"""
|
| 1316 |
if len(colors) == 1:
|
| 1317 |
return np.full_like(progress, colors[0][channel], dtype=np.uint8)
|
| 1318 |
|
|
|
|
| 1332 |
return np.clip(result, 0, 255).astype(np.uint8)
|
| 1333 |
|
| 1334 |
def _interpolate_color(colors: list, progress: float) -> tuple:
|
| 1335 |
+
"""Interpolate between multiple colors (RGB tuple)"""
|
| 1336 |
if len(colors) == 1:
|
| 1337 |
return colors[0]
|
| 1338 |
elif len(colors) == 2:
|
|
|
|
| 1343 |
else:
|
| 1344 |
segment = progress * (len(colors) - 1)
|
| 1345 |
idx = int(segment)
|
| 1346 |
+
local_progress = max(0.0, min(1.0, segment - idx))
|
| 1347 |
if idx >= len(colors) - 1:
|
| 1348 |
return colors[-1]
|
| 1349 |
c1, c2 = colors[idx], colors[idx + 1]
|
|
|
|
| 1353 |
return (r, g, b)
|
| 1354 |
|
| 1355 |
def _apply_background_adjustments(background: np.ndarray, bg_config: Dict[str, Any]) -> np.ndarray:
|
| 1356 |
+
"""Apply brightness and contrast adjustments to background"""
|
| 1357 |
try:
|
| 1358 |
brightness = bg_config.get("brightness", 1.0)
|
| 1359 |
contrast = bg_config.get("contrast", 1.0)
|