MogensR commited on
Commit
5155650
·
1 Parent(s): d7cf15f

Delete utils/utilities.py

Browse files
Files changed (1) hide show
  1. utils/utilities.py +0 -1281
utils/utilities.py DELETED
@@ -1,1281 +0,0 @@
1
- #!/usr/bin/env python3
2
- """
3
- Enhanced utilities.py - Core computer vision functions with auto-best quality
4
- VERSION: 2.0-auto-best
5
- ROLLBACK: Set USE_ENHANCED_SEGMENTATION = False to revert to original behavior
6
- """
7
-
8
- import os
9
- import cv2
10
- import numpy as np
11
- import torch
12
- from PIL import Image, ImageDraw
13
- import logging
14
- import time
15
- from typing import Optional, Dict, Any, Tuple, List
16
- from pathlib import Path
17
-
18
- # ============================================================================
19
- # VERSION CONTROL AND FEATURE FLAGS - EASY ROLLBACK
20
- # ============================================================================
21
-
22
- # ROLLBACK CONTROL: Set to False to use original functions
23
- USE_ENHANCED_SEGMENTATION = True
24
- USE_AUTO_TEMPORAL_CONSISTENCY = True
25
- USE_INTELLIGENT_PROMPTING = True
26
- USE_ITERATIVE_REFINEMENT = True
27
-
28
- # Logging
29
- logging.basicConfig(level=logging.INFO)
30
- logger = logging.getLogger(__name__)
31
-
32
- # Professional background templates (unchanged)
33
- PROFESSIONAL_BACKGROUNDS = {
34
- "office_modern": {
35
- "name": "Modern Office",
36
- "type": "gradient",
37
- "colors": ["#f8f9fa", "#e9ecef", "#dee2e6"],
38
- "direction": "diagonal",
39
- "description": "Clean, contemporary office environment",
40
- "brightness": 0.95,
41
- "contrast": 1.1
42
- },
43
- "studio_blue": {
44
- "name": "Professional Blue",
45
- "type": "gradient",
46
- "colors": ["#1e3c72", "#2a5298", "#3498db"],
47
- "direction": "radial",
48
- "description": "Broadcast-quality blue studio",
49
- "brightness": 0.9,
50
- "contrast": 1.2
51
- },
52
- "studio_green": {
53
- "name": "Broadcast Green",
54
- "type": "color",
55
- "colors": ["#00b894"],
56
- "chroma_key": True,
57
- "description": "Professional green screen replacement",
58
- "brightness": 1.0,
59
- "contrast": 1.0
60
- },
61
- "minimalist": {
62
- "name": "Minimalist White",
63
- "type": "gradient",
64
- "colors": ["#ffffff", "#f1f2f6", "#ddd"],
65
- "direction": "soft_radial",
66
- "description": "Clean, minimal background",
67
- "brightness": 0.98,
68
- "contrast": 0.9
69
- },
70
- "warm_gradient": {
71
- "name": "Warm Sunset",
72
- "type": "gradient",
73
- "colors": ["#ff7675", "#fd79a8", "#fdcb6e"],
74
- "direction": "diagonal",
75
- "description": "Warm, inviting atmosphere",
76
- "brightness": 0.85,
77
- "contrast": 1.15
78
- },
79
- "tech_dark": {
80
- "name": "Tech Dark",
81
- "type": "gradient",
82
- "colors": ["#0c0c0c", "#2d3748", "#4a5568"],
83
- "direction": "vertical",
84
- "description": "Modern tech/gaming setup",
85
- "brightness": 0.7,
86
- "contrast": 1.3
87
- },
88
- "corporate_blue": {
89
- "name": "Corporate Blue",
90
- "type": "gradient",
91
- "colors": ["#667eea", "#764ba2", "#f093fb"],
92
- "direction": "diagonal",
93
- "description": "Professional corporate background",
94
- "brightness": 0.88,
95
- "contrast": 1.1
96
- },
97
- "nature_blur": {
98
- "name": "Soft Nature",
99
- "type": "gradient",
100
- "colors": ["#a8edea", "#fed6e3", "#d299c2"],
101
- "direction": "radial",
102
- "description": "Soft blurred nature effect",
103
- "brightness": 0.92,
104
- "contrast": 0.95
105
- }
106
- }
107
-
108
- # Exceptions (unchanged)
109
- class SegmentationError(Exception):
110
- """Custom exception for segmentation failures"""
111
- pass
112
-
113
- class MaskRefinementError(Exception):
114
- """Custom exception for mask refinement failures"""
115
- pass
116
-
117
- class BackgroundReplacementError(Exception):
118
- """Custom exception for background replacement failures"""
119
- pass
120
-
121
- # ============================================================================
122
- # ENHANCED SEGMENTATION FUNCTIONS - NEW AUTO-BEST VERSION
123
- # ============================================================================
124
-
125
- def segment_person_hq(image: np.ndarray, predictor: Any, fallback_enabled: bool = True) -> np.ndarray:
126
- """
127
- ENHANCED VERSION 2.0: High-quality person segmentation with intelligent automation
128
-
129
- ROLLBACK: Set USE_ENHANCED_SEGMENTATION = False to revert to original behavior
130
-
131
- Args:
132
- image: Input image (H, W, 3)
133
- predictor: SAM2 predictor instance
134
- fallback_enabled: Whether to use fallback segmentation if AI fails
135
-
136
- Returns:
137
- Binary mask (H, W) with values 0-255
138
- """
139
- if not USE_ENHANCED_SEGMENTATION:
140
- return segment_person_hq_original(image, predictor, fallback_enabled)
141
-
142
- logger.debug("Using ENHANCED segmentation with intelligent automation")
143
-
144
- if image is None or image.size == 0:
145
- raise SegmentationError("Invalid input image")
146
-
147
- try:
148
- # Validate predictor
149
- if predictor is None:
150
- if fallback_enabled:
151
- logger.warning("SAM2 predictor not available, using fallback")
152
- return _fallback_segmentation(image)
153
- else:
154
- raise SegmentationError("SAM2 predictor not available")
155
-
156
- # Set image for prediction
157
- try:
158
- predictor.set_image(image)
159
- except Exception as e:
160
- logger.error(f"Failed to set image in predictor: {e}")
161
- if fallback_enabled:
162
- return _fallback_segmentation(image)
163
- else:
164
- raise SegmentationError(f"Predictor setup failed: {e}")
165
-
166
- # ENHANCED: Intelligent automatic prompt generation
167
- if USE_INTELLIGENT_PROMPTING:
168
- mask = _segment_with_intelligent_prompts(image, predictor)
169
- else:
170
- mask = _segment_with_basic_prompts(image, predictor)
171
-
172
- # ENHANCED: Iterative refinement
173
- if USE_ITERATIVE_REFINEMENT and mask is not None:
174
- mask = _auto_refine_mask_iteratively(image, mask, predictor)
175
-
176
- # Validate mask quality
177
- if not _validate_mask_quality(mask, image.shape[:2]):
178
- logger.warning("Mask quality validation failed")
179
- if fallback_enabled:
180
- return _fallback_segmentation(image)
181
- else:
182
- raise SegmentationError("Poor mask quality")
183
-
184
- logger.debug(f"Enhanced segmentation successful - mask range: {mask.min()}-{mask.max()}")
185
- return mask
186
-
187
- except SegmentationError:
188
- raise
189
- except Exception as e:
190
- logger.error(f"Unexpected segmentation error: {e}")
191
- if fallback_enabled:
192
- return _fallback_segmentation(image)
193
- else:
194
- raise SegmentationError(f"Unexpected error: {e}")
195
-
196
- def _segment_with_intelligent_prompts(image: np.ndarray, predictor: Any) -> np.ndarray:
197
- """NEW: Intelligent automatic prompt generation"""
198
- try:
199
- h, w = image.shape[:2]
200
-
201
- # Generate content-aware prompts
202
- pos_points, neg_points = _generate_smart_prompts(image)
203
-
204
- if len(pos_points) == 0:
205
- # Fallback to center point
206
- pos_points = np.array([[w//2, h//2]], dtype=np.float32)
207
-
208
- # Combine points and labels
209
- points = np.vstack([pos_points, neg_points])
210
- labels = np.hstack([
211
- np.ones(len(pos_points), dtype=np.int32),
212
- np.zeros(len(neg_points), dtype=np.int32)
213
- ])
214
-
215
- logger.debug(f"Using {len(pos_points)} positive, {len(neg_points)} negative points")
216
-
217
- # Perform prediction
218
- with torch.no_grad():
219
- masks, scores, _ = predictor.predict(
220
- point_coords=points,
221
- point_labels=labels,
222
- multimask_output=True
223
- )
224
-
225
- if masks is None or len(masks) == 0:
226
- raise SegmentationError("No masks generated")
227
-
228
- # Select best mask
229
- if scores is not None and len(scores) > 0:
230
- best_idx = np.argmax(scores)
231
- best_mask = masks[best_idx]
232
- logger.debug(f"Selected mask {best_idx} with score {scores[best_idx]:.3f}")
233
- else:
234
- best_mask = masks[0]
235
-
236
- return _process_mask(best_mask)
237
-
238
- except Exception as e:
239
- logger.error(f"Intelligent prompting failed: {e}")
240
- raise
241
-
242
- def _generate_smart_prompts(image: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
243
- """NEW: Generate optimal positive/negative points automatically"""
244
- try:
245
- h, w = image.shape[:2]
246
-
247
- # Method 1: Saliency-based point placement
248
- try:
249
- saliency = cv2.saliency.StaticSaliencySpectralResidual_create()
250
- success, saliency_map = saliency.computeSaliency(image)
251
-
252
- if success:
253
- # Find high-saliency regions
254
- saliency_thresh = cv2.threshold(saliency_map, 0.7, 1, cv2.THRESH_BINARY)[1]
255
- contours, _ = cv2.findContours((saliency_thresh * 255).astype(np.uint8),
256
- cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
257
-
258
- positive_points = []
259
- if contours:
260
- # Get center points of largest salient regions
261
- for contour in sorted(contours, key=cv2.contourArea, reverse=True)[:3]:
262
- M = cv2.moments(contour)
263
- if M["m00"] != 0:
264
- cx = int(M["m10"] / M["m00"])
265
- cy = int(M["m01"] / M["m00"])
266
- # Ensure points are within image bounds
267
- if 0 < cx < w and 0 < cy < h:
268
- positive_points.append([cx, cy])
269
-
270
- if positive_points:
271
- logger.debug(f"Generated {len(positive_points)} saliency-based points")
272
- positive_points = np.array(positive_points, dtype=np.float32)
273
- else:
274
- raise Exception("No valid saliency points found")
275
-
276
- except Exception as e:
277
- logger.debug(f"Saliency method failed: {e}, using fallback")
278
- # Method 2: Fallback to strategic grid points
279
- positive_points = np.array([
280
- [w//2, h//3], # Upper body
281
- [w//2, h//2], # Center torso
282
- [w//2, 2*h//3], # Lower body
283
- ], dtype=np.float32)
284
-
285
- # Always place negative points in corners and edges (likely background)
286
- negative_points = np.array([
287
- [10, 10], # Top-left corner
288
- [w-10, 10], # Top-right corner
289
- [10, h-10], # Bottom-left corner
290
- [w-10, h-10], # Bottom-right corner
291
- [w//2, 5], # Top center edge
292
- [w//2, h-5], # Bottom center edge
293
- ], dtype=np.float32)
294
-
295
- return positive_points, negative_points
296
-
297
- except Exception as e:
298
- logger.warning(f"Smart prompt generation failed: {e}")
299
- # Ultimate fallback
300
- h, w = image.shape[:2]
301
- positive_points = np.array([[w//2, h//2]], dtype=np.float32)
302
- negative_points = np.array([[10, 10], [w-10, 10]], dtype=np.float32)
303
- return positive_points, negative_points
304
-
305
- def _auto_refine_mask_iteratively(image: np.ndarray, initial_mask: np.ndarray,
306
- predictor: Any, max_iterations: int = 2) -> np.ndarray:
307
- """NEW: Automatically refine mask based on quality assessment"""
308
- try:
309
- current_mask = initial_mask.copy()
310
- h, w = image.shape[:2]
311
-
312
- for iteration in range(max_iterations):
313
- # Analyze mask quality
314
- quality_score = _assess_mask_quality(current_mask, image)
315
- logger.debug(f"Iteration {iteration}: quality score = {quality_score:.3f}")
316
-
317
- if quality_score > 0.85: # Good enough
318
- logger.debug(f"Quality sufficient after {iteration} iterations")
319
- break
320
-
321
- # Identify problem areas
322
- problem_areas = _find_mask_errors(current_mask, image)
323
-
324
- if np.any(problem_areas):
325
- # Generate corrective prompts
326
- corrective_points, corrective_labels = _generate_corrective_prompts(
327
- image, current_mask, problem_areas
328
- )
329
-
330
- if len(corrective_points) > 0:
331
- # Re-run SAM2 with additional prompts
332
- try:
333
- with torch.no_grad():
334
- masks, scores, _ = predictor.predict(
335
- point_coords=corrective_points,
336
- point_labels=corrective_labels,
337
- mask_input=current_mask[None, :, :], # Add batch dimension
338
- multimask_output=False
339
- )
340
-
341
- if masks is not None and len(masks) > 0:
342
- refined_mask = _process_mask(masks[0])
343
-
344
- # Only use refined mask if it's actually better
345
- if _assess_mask_quality(refined_mask, image) > quality_score:
346
- current_mask = refined_mask
347
- logger.debug(f"Improved mask in iteration {iteration}")
348
- else:
349
- logger.debug(f"Refinement didn't improve quality in iteration {iteration}")
350
- break
351
-
352
- except Exception as e:
353
- logger.debug(f"Refinement iteration {iteration} failed: {e}")
354
- break
355
- else:
356
- logger.debug("No problem areas detected")
357
- break
358
-
359
- return current_mask
360
-
361
- except Exception as e:
362
- logger.warning(f"Iterative refinement failed: {e}")
363
- return initial_mask
364
-
365
- def _assess_mask_quality(mask: np.ndarray, image: np.ndarray) -> float:
366
- """NEW: Assess mask quality automatically"""
367
- try:
368
- h, w = image.shape[:2]
369
-
370
- # Quality factors
371
- scores = []
372
-
373
- # 1. Area ratio (person should be 5-80% of image)
374
- mask_area = np.sum(mask > 127)
375
- total_area = h * w
376
- area_ratio = mask_area / total_area
377
-
378
- if 0.05 <= area_ratio <= 0.8:
379
- area_score = 1.0
380
- elif area_ratio < 0.05:
381
- area_score = area_ratio / 0.05
382
- else:
383
- area_score = max(0, 1.0 - (area_ratio - 0.8) / 0.2)
384
- scores.append(area_score)
385
-
386
- # 2. Centeredness (person should be roughly centered)
387
- mask_binary = mask > 127
388
- if np.any(mask_binary):
389
- mask_center_y, mask_center_x = np.where(mask_binary)
390
- center_y = np.mean(mask_center_y) / h
391
- center_x = np.mean(mask_center_x) / w
392
-
393
- center_score = 1.0 - min(abs(center_x - 0.5), abs(center_y - 0.5))
394
- scores.append(center_score)
395
- else:
396
- scores.append(0.0)
397
-
398
- # 3. Edge smoothness
399
- edges = cv2.Canny(mask, 50, 150)
400
- edge_density = np.sum(edges > 0) / total_area
401
- smoothness_score = max(0, 1.0 - edge_density * 10) # Penalize too many edges
402
- scores.append(smoothness_score)
403
-
404
- # 4. Connectivity (prefer single connected component)
405
- num_labels, _ = cv2.connectedComponents(mask)
406
- connectivity_score = max(0, 1.0 - (num_labels - 2) * 0.2) # -2 because background is label 0
407
- scores.append(connectivity_score)
408
-
409
- # Weighted average
410
- weights = [0.3, 0.2, 0.3, 0.2]
411
- overall_score = np.average(scores, weights=weights)
412
-
413
- return overall_score
414
-
415
- except Exception as e:
416
- logger.warning(f"Quality assessment failed: {e}")
417
- return 0.5 # Neutral score
418
-
419
- def _find_mask_errors(mask: np.ndarray, image: np.ndarray) -> np.ndarray:
420
- """NEW: Identify problematic areas in mask"""
421
- try:
422
- # Find areas with high gradient that might need correction
423
- gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
424
-
425
- # Edge detection on original image
426
- edges = cv2.Canny(gray, 50, 150)
427
-
428
- # Mask edges
429
- mask_edges = cv2.Canny(mask, 50, 150)
430
-
431
- # Find discrepancy between image edges and mask edges
432
- edge_discrepancy = cv2.bitwise_xor(edges, mask_edges)
433
-
434
- # Dilate to create error regions
435
- kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
436
- error_regions = cv2.dilate(edge_discrepancy, kernel, iterations=1)
437
-
438
- return error_regions > 0
439
-
440
- except Exception as e:
441
- logger.warning(f"Error detection failed: {e}")
442
- return np.zeros_like(mask, dtype=bool)
443
-
444
- def _generate_corrective_prompts(image: np.ndarray, mask: np.ndarray,
445
- problem_areas: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
446
- """NEW: Generate corrective prompts based on problem areas"""
447
- try:
448
- # Find centers of problem regions
449
- contours, _ = cv2.findContours(problem_areas.astype(np.uint8),
450
- cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
451
-
452
- corrective_points = []
453
- corrective_labels = []
454
-
455
- for contour in contours:
456
- if cv2.contourArea(contour) > 100: # Ignore tiny regions
457
- M = cv2.moments(contour)
458
- if M["m00"] != 0:
459
- cx = int(M["m10"] / M["m00"])
460
- cy = int(M["m01"] / M["m00"])
461
-
462
- # Determine if this should be positive or negative
463
- # Sample the current mask at this point
464
- current_mask_value = mask[cy, cx]
465
-
466
- # If mask says background but image has strong edges, add positive point
467
- # If mask says foreground but area looks like background, add negative point
468
- if current_mask_value < 127:
469
- # Currently background, maybe should be foreground
470
- corrective_points.append([cx, cy])
471
- corrective_labels.append(1) # Positive
472
- else:
473
- # Currently foreground, maybe should be background
474
- corrective_points.append([cx, cy])
475
- corrective_labels.append(0) # Negative
476
-
477
- return (np.array(corrective_points, dtype=np.float32) if corrective_points else np.array([]).reshape(0, 2),
478
- np.array(corrective_labels, dtype=np.int32) if corrective_labels else np.array([], dtype=np.int32))
479
-
480
- except Exception as e:
481
- logger.warning(f"Corrective prompt generation failed: {e}")
482
- return np.array([]).reshape(0, 2), np.array([], dtype=np.int32)
483
-
484
- def _segment_with_basic_prompts(image: np.ndarray, predictor: Any) -> np.ndarray:
485
- """FALLBACK: Original basic prompting method"""
486
- h, w = image.shape[:2]
487
-
488
- # Original strategic points with negative prompts added
489
- positive_points = np.array([
490
- [w//2, h//3], # Head area
491
- [w//2, h//2], # Torso center
492
- [w//2, 2*h//3], # Lower body
493
- ], dtype=np.float32)
494
-
495
- negative_points = np.array([
496
- [w//10, h//10], # Top-left corner (background)
497
- [9*w//10, h//10], # Top-right corner (background)
498
- [w//10, 9*h//10], # Bottom-left corner (background)
499
- [9*w//10, 9*h//10], # Bottom-right corner (background)
500
- ], dtype=np.float32)
501
-
502
- # Combine points
503
- points = np.vstack([positive_points, negative_points])
504
- labels = np.array([1, 1, 1, 0, 0, 0, 0], dtype=np.int32)
505
-
506
- # Perform prediction
507
- with torch.no_grad():
508
- masks, scores, _ = predictor.predict(
509
- point_coords=points,
510
- point_labels=labels,
511
- multimask_output=True
512
- )
513
-
514
- if masks is None or len(masks) == 0:
515
- raise SegmentationError("No masks generated")
516
-
517
- # Select best mask based on score
518
- best_idx = np.argmax(scores) if scores is not None and len(scores) > 0 else 0
519
- best_mask = masks[best_idx]
520
-
521
- return _process_mask(best_mask)
522
-
523
- # ============================================================================
524
- # ORIGINAL FUNCTION PRESERVED FOR ROLLBACK
525
- # ============================================================================
526
-
527
- def segment_person_hq_original(image: np.ndarray, predictor: Any, fallback_enabled: bool = True) -> np.ndarray:
528
- """
529
- ORIGINAL VERSION: Preserved for rollback capability
530
- """
531
- if image is None or image.size == 0:
532
- raise SegmentationError("Invalid input image")
533
-
534
- try:
535
- # Validate predictor
536
- if predictor is None:
537
- if fallback_enabled:
538
- logger.warning("SAM2 predictor not available, using fallback")
539
- return _fallback_segmentation(image)
540
- else:
541
- raise SegmentationError("SAM2 predictor not available")
542
-
543
- # Set image for prediction
544
- try:
545
- predictor.set_image(image)
546
- except Exception as e:
547
- logger.error(f"Failed to set image in predictor: {e}")
548
- if fallback_enabled:
549
- return _fallback_segmentation(image)
550
- else:
551
- raise SegmentationError(f"Predictor setup failed: {e}")
552
-
553
- h, w = image.shape[:2]
554
-
555
- # Enhanced strategic point placement for better person detection
556
- points = np.array([
557
- [w//2, h//4], # Head center
558
- [w//2, h//2], # Torso center
559
- [w//2, 3*h//4], # Lower body
560
- [w//3, h//2], # Left side
561
- [2*w//3, h//2], # Right side
562
- [w//2, h//6], # Upper head
563
- [w//4, 2*h//3], # Left leg area
564
- [3*w//4, 2*h//3], # Right leg area
565
- ], dtype=np.float32)
566
-
567
- labels = np.ones(len(points), dtype=np.int32)
568
-
569
- # Perform prediction with error handling
570
- try:
571
- with torch.no_grad():
572
- masks, scores, _ = predictor.predict(
573
- point_coords=points,
574
- point_labels=labels,
575
- multimask_output=True
576
- )
577
- except Exception as e:
578
- logger.error(f"SAM2 prediction failed: {e}")
579
- if fallback_enabled:
580
- return _fallback_segmentation(image)
581
- else:
582
- raise SegmentationError(f"Prediction failed: {e}")
583
-
584
- # Validate prediction results
585
- if masks is None or len(masks) == 0:
586
- logger.warning("SAM2 returned no masks")
587
- if fallback_enabled:
588
- return _fallback_segmentation(image)
589
- else:
590
- raise SegmentationError("No masks generated")
591
-
592
- if scores is None or len(scores) == 0:
593
- logger.warning("SAM2 returned no scores")
594
- best_mask = masks[0]
595
- else:
596
- # Select best mask based on score
597
- best_idx = np.argmax(scores)
598
- best_mask = masks[best_idx]
599
- logger.debug(f"Selected mask {best_idx} with score {scores[best_idx]:.3f}")
600
-
601
- # Process mask to ensure correct format
602
- mask = _process_mask(best_mask)
603
-
604
- # Validate mask quality
605
- if not _validate_mask_quality(mask, image.shape[:2]):
606
- logger.warning("Mask quality validation failed")
607
- if fallback_enabled:
608
- return _fallback_segmentation(image)
609
- else:
610
- raise SegmentationError("Poor mask quality")
611
-
612
- logger.debug(f"Segmentation successful - mask range: {mask.min()}-{mask.max()}")
613
- return mask
614
-
615
- except SegmentationError:
616
- raise
617
- except Exception as e:
618
- logger.error(f"Unexpected segmentation error: {e}")
619
- if fallback_enabled:
620
- return _fallback_segmentation(image)
621
- else:
622
- raise SegmentationError(f"Unexpected error: {e}")
623
-
624
- # ============================================================================
625
- # EXISTING FUNCTIONS PRESERVED (unchanged for rollback safety)
626
- # ============================================================================
627
-
628
- def _process_mask(mask: np.ndarray) -> np.ndarray:
629
- """Process raw mask to ensure correct format and range"""
630
- try:
631
- # Handle different input formats
632
- if len(mask.shape) > 2:
633
- mask = mask.squeeze()
634
-
635
- if len(mask.shape) > 2:
636
- mask = mask[:, :, 0] if mask.shape[2] > 0 else mask.sum(axis=2)
637
-
638
- # Ensure proper data type and range
639
- if mask.dtype == bool:
640
- mask = mask.astype(np.uint8) * 255
641
- elif mask.dtype == np.float32 or mask.dtype == np.float64:
642
- if mask.max() <= 1.0:
643
- mask = (mask * 255).astype(np.uint8)
644
- else:
645
- mask = np.clip(mask, 0, 255).astype(np.uint8)
646
- else:
647
- mask = mask.astype(np.uint8)
648
-
649
- # Post-process for cleaner edges
650
- kernel = np.ones((3, 3), np.uint8)
651
- mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
652
- mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
653
-
654
- # Ensure binary threshold
655
- _, mask = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY)
656
-
657
- return mask
658
-
659
- except Exception as e:
660
- logger.error(f"Mask processing failed: {e}")
661
- # Return a basic fallback mask
662
- h, w = mask.shape[:2] if len(mask.shape) >= 2 else (256, 256)
663
- fallback = np.zeros((h, w), dtype=np.uint8)
664
- fallback[h//4:3*h//4, w//4:3*w//4] = 255
665
- return fallback
666
-
667
- def _validate_mask_quality(mask: np.ndarray, image_shape: Tuple[int, int]) -> bool:
668
- """Validate that the mask meets quality criteria"""
669
- try:
670
- h, w = image_shape
671
- mask_area = np.sum(mask > 127)
672
- total_area = h * w
673
-
674
- # Check if mask area is reasonable (5% to 80% of image)
675
- area_ratio = mask_area / total_area
676
- if area_ratio < 0.05 or area_ratio > 0.8:
677
- logger.warning(f"Suspicious mask area ratio: {area_ratio:.3f}")
678
- return False
679
-
680
- # Check if mask is not just a blob in corner
681
- mask_binary = mask > 127
682
- mask_center_y, mask_center_x = np.where(mask_binary)
683
-
684
- if len(mask_center_y) == 0:
685
- logger.warning("Empty mask")
686
- return False
687
-
688
- center_y = np.mean(mask_center_y)
689
- center_x = np.mean(mask_center_x)
690
-
691
- # Person should be roughly centered
692
- if center_y < h * 0.2 or center_y > h * 0.9:
693
- logger.warning(f"Mask center too far from expected person location: y={center_y/h:.2f}")
694
- return False
695
-
696
- return True
697
-
698
- except Exception as e:
699
- logger.warning(f"Mask validation error: {e}")
700
- return True # Default to accepting mask if validation fails
701
-
702
- def _fallback_segmentation(image: np.ndarray) -> np.ndarray:
703
- """Fallback segmentation when AI models fail"""
704
- try:
705
- logger.info("Using fallback segmentation strategy")
706
- h, w = image.shape[:2]
707
-
708
- # Try background subtraction approach
709
- try:
710
- # Simple background subtraction
711
- gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
712
-
713
- # Assume background is around the edges
714
- edge_pixels = np.concatenate([
715
- gray[0, :], gray[-1, :], gray[:, 0], gray[:, -1]
716
- ])
717
- bg_color = np.median(edge_pixels)
718
-
719
- # Create mask based on difference from background
720
- diff = np.abs(gray.astype(float) - bg_color)
721
- mask = (diff > 30).astype(np.uint8) * 255
722
-
723
- # Morphological operations to clean up
724
- kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (7, 7))
725
- mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
726
- mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
727
-
728
- # If mask looks reasonable, use it
729
- if _validate_mask_quality(mask, image.shape[:2]):
730
- logger.info("Background subtraction fallback successful")
731
- return mask
732
-
733
- except Exception as e:
734
- logger.warning(f"Background subtraction fallback failed: {e}")
735
-
736
- # Simple geometric fallback
737
- mask = np.zeros((h, w), dtype=np.uint8)
738
-
739
- # Create an elliptical mask in center assuming person location
740
- center_x, center_y = w // 2, h // 2
741
- radius_x, radius_y = w // 3, h // 2.5
742
-
743
- y, x = np.ogrid[:h, :w]
744
- mask_ellipse = ((x - center_x) / radius_x) ** 2 + ((y - center_y) / radius_y) ** 2 <= 1
745
- mask[mask_ellipse] = 255
746
-
747
- logger.info("Using geometric fallback mask")
748
- return mask
749
-
750
- except Exception as e:
751
- logger.error(f"All fallback strategies failed: {e}")
752
- # Last resort: simple center rectangle
753
- h, w = image.shape[:2]
754
- mask = np.zeros((h, w), dtype=np.uint8)
755
- mask[h//6:5*h//6, w//4:3*w//4] = 255
756
- return mask
757
-
758
- # ============================================================================
759
- # ALL OTHER EXISTING FUNCTIONS REMAIN UNCHANGED FOR ROLLBACK SAFETY
760
- # ============================================================================
761
-
762
- def refine_mask_hq(image: np.ndarray, mask: np.ndarray, matanyone_processor: Any,
763
- fallback_enabled: bool = True) -> np.ndarray:
764
- """
765
- Enhanced mask refinement with MatAnyone and robust fallbacks
766
- UNCHANGED for rollback safety
767
- """
768
- if image is None or mask is None:
769
- raise MaskRefinementError("Invalid input image or mask")
770
-
771
- try:
772
- # Ensure mask is in correct format
773
- mask = _process_mask(mask)
774
-
775
- # Try MatAnyone if available
776
- if matanyone_processor is not None:
777
- try:
778
- logger.debug("Attempting MatAnyone refinement")
779
- refined_mask = _matanyone_refine(image, mask, matanyone_processor)
780
-
781
- if refined_mask is not None and _validate_mask_quality(refined_mask, image.shape[:2]):
782
- logger.debug("MatAnyone refinement successful")
783
- return refined_mask
784
- else:
785
- logger.warning("MatAnyone produced poor quality mask")
786
-
787
- except Exception as e:
788
- logger.warning(f"MatAnyone refinement failed: {e}")
789
-
790
- # Fallback to enhanced OpenCV refinement
791
- if fallback_enabled:
792
- logger.debug("Using enhanced OpenCV refinement")
793
- return enhance_mask_opencv_advanced(image, mask)
794
- else:
795
- raise MaskRefinementError("MatAnyone failed and fallback disabled")
796
-
797
- except MaskRefinementError:
798
- raise
799
- except Exception as e:
800
- logger.error(f"Unexpected mask refinement error: {e}")
801
- if fallback_enabled:
802
- return enhance_mask_opencv_advanced(image, mask)
803
- else:
804
- raise MaskRefinementError(f"Unexpected error: {e}")
805
-
806
- def _matanyone_refine(image: np.ndarray, mask: np.ndarray, processor: Any) -> Optional[np.ndarray]:
807
- """Attempt MatAnyone mask refinement - Python 3.10 compatible"""
808
- try:
809
- # Different possible MatAnyone interfaces
810
- if hasattr(processor, 'infer'):
811
- refined_mask = processor.infer(image, mask)
812
- elif hasattr(processor, 'process'):
813
- refined_mask = processor.process(image, mask)
814
- elif callable(processor):
815
- refined_mask = processor(image, mask)
816
- else:
817
- logger.warning("Unknown MatAnyone interface")
818
- return None
819
-
820
- if refined_mask is None:
821
- return None
822
-
823
- # Process the refined mask
824
- refined_mask = _process_mask(refined_mask)
825
-
826
- logger.debug("MatAnyone refinement successful")
827
- return refined_mask
828
-
829
- except Exception as e:
830
- logger.warning(f"MatAnyone processing error: {e}")
831
- return None
832
-
833
- def enhance_mask_opencv_advanced(image: np.ndarray, mask: np.ndarray) -> np.ndarray:
834
- """Advanced OpenCV-based mask enhancement with multiple techniques"""
835
- try:
836
- if len(mask.shape) == 3:
837
- mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)
838
-
839
- # Ensure proper range
840
- if mask.max() <= 1.0:
841
- mask = (mask * 255).astype(np.uint8)
842
-
843
- # Multi-stage refinement
844
-
845
- # 1. Bilateral filtering for edge preservation
846
- refined_mask = cv2.bilateralFilter(mask, 9, 75, 75)
847
-
848
- # 2. Edge-aware smoothing using guided filter approximation
849
- refined_mask = _guided_filter_approx(image, refined_mask, radius=8, eps=0.2)
850
-
851
- # 3. Morphological operations for structure
852
- kernel_close = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
853
- refined_mask = cv2.morphologyEx(refined_mask, cv2.MORPH_CLOSE, kernel_close)
854
-
855
- kernel_open = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
856
- refined_mask = cv2.morphologyEx(refined_mask, cv2.MORPH_OPEN, kernel_open)
857
-
858
- # 4. Final smoothing
859
- refined_mask = cv2.GaussianBlur(refined_mask, (3, 3), 0.8)
860
-
861
- # 5. Ensure binary output
862
- _, refined_mask = cv2.threshold(refined_mask, 127, 255, cv2.THRESH_BINARY)
863
-
864
- return refined_mask
865
-
866
- except Exception as e:
867
- logger.warning(f"Enhanced OpenCV refinement failed: {e}")
868
- # Simple fallback
869
- return cv2.GaussianBlur(mask, (5, 5), 1.0)
870
-
871
- def _guided_filter_approx(guide: np.ndarray, mask: np.ndarray, radius: int = 8, eps: float = 0.2) -> np.ndarray:
872
- """Approximation of guided filter for edge-aware smoothing"""
873
- try:
874
- guide_gray = cv2.cvtColor(guide, cv2.COLOR_BGR2GRAY) if len(guide.shape) == 3 else guide
875
- guide_gray = guide_gray.astype(np.float32) / 255.0
876
- mask_float = mask.astype(np.float32) / 255.0
877
-
878
- # Box filter approximation
879
- kernel_size = 2 * radius + 1
880
-
881
- # Mean filters
882
- mean_guide = cv2.boxFilter(guide_gray, -1, (kernel_size, kernel_size))
883
- mean_mask = cv2.boxFilter(mask_float, -1, (kernel_size, kernel_size))
884
- corr_guide_mask = cv2.boxFilter(guide_gray * mask_float, -1, (kernel_size, kernel_size))
885
-
886
- # Covariance
887
- cov_guide_mask = corr_guide_mask - mean_guide * mean_mask
888
- mean_guide_sq = cv2.boxFilter(guide_gray * guide_gray, -1, (kernel_size, kernel_size))
889
- var_guide = mean_guide_sq - mean_guide * mean_guide
890
-
891
- # Coefficients
892
- a = cov_guide_mask / (var_guide + eps)
893
- b = mean_mask - a * mean_guide
894
-
895
- # Apply coefficients
896
- mean_a = cv2.boxFilter(a, -1, (kernel_size, kernel_size))
897
- mean_b = cv2.boxFilter(b, -1, (kernel_size, kernel_size))
898
-
899
- output = mean_a * guide_gray + mean_b
900
- output = np.clip(output * 255, 0, 255).astype(np.uint8)
901
-
902
- return output
903
-
904
- except Exception as e:
905
- logger.warning(f"Guided filter approximation failed: {e}")
906
- return mask
907
-
908
- def replace_background_hq(frame: np.ndarray, mask: np.ndarray, background: np.ndarray,
909
- fallback_enabled: bool = True) -> np.ndarray:
910
- """Enhanced background replacement with comprehensive error handling and quality improvements"""
911
- if frame is None or mask is None or background is None:
912
- raise BackgroundReplacementError("Invalid input frame, mask, or background")
913
-
914
- try:
915
- # Resize background to match frame
916
- background = cv2.resize(background, (frame.shape[1], frame.shape[0]),
917
- interpolation=cv2.INTER_LANCZOS4)
918
-
919
- # Process mask
920
- if len(mask.shape) == 3:
921
- mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)
922
-
923
- if mask.dtype != np.uint8:
924
- mask = mask.astype(np.uint8)
925
-
926
- if mask.max() <= 1.0:
927
- logger.debug("Converting normalized mask to 0-255 range")
928
- mask = (mask * 255).astype(np.uint8)
929
-
930
- # Enhanced compositing with multiple techniques
931
- try:
932
- result = _advanced_compositing(frame, mask, background)
933
- logger.debug("Advanced compositing successful")
934
- return result
935
-
936
- except Exception as e:
937
- logger.warning(f"Advanced compositing failed: {e}")
938
- if fallback_enabled:
939
- return _simple_compositing(frame, mask, background)
940
- else:
941
- raise BackgroundReplacementError(f"Advanced compositing failed: {e}")
942
-
943
- except BackgroundReplacementError:
944
- raise
945
- except Exception as e:
946
- logger.error(f"Unexpected background replacement error: {e}")
947
- if fallback_enabled:
948
- return _simple_compositing(frame, mask, background)
949
- else:
950
- raise BackgroundReplacementError(f"Unexpected error: {e}")
951
-
952
- def _advanced_compositing(frame: np.ndarray, mask: np.ndarray, background: np.ndarray) -> np.ndarray:
953
- """Advanced compositing with edge feathering and color correction"""
954
- try:
955
- # Create high-quality alpha mask
956
- threshold = 100 # Lower threshold for better person extraction
957
- _, mask_binary = cv2.threshold(mask, threshold, 255, cv2.THRESH_BINARY)
958
-
959
- # Clean up mask
960
- kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
961
- mask_binary = cv2.morphologyEx(mask_binary, cv2.MORPH_CLOSE, kernel)
962
- mask_binary = cv2.morphologyEx(mask_binary, cv2.MORPH_OPEN, kernel)
963
-
964
- # Create smooth alpha channel with edge feathering
965
- mask_smooth = cv2.GaussianBlur(mask_binary.astype(np.float32), (5, 5), 1.0)
966
- mask_smooth = mask_smooth / 255.0
967
-
968
- # Apply gamma correction for better blending
969
- mask_smooth = np.power(mask_smooth, 0.8)
970
-
971
- # Enhance edges - boost values near 1.0, reduce values near 0.0
972
- mask_smooth = np.where(mask_smooth > 0.5,
973
- np.minimum(mask_smooth * 1.1, 1.0),
974
- mask_smooth * 0.9)
975
-
976
- # Color matching between foreground and background
977
- frame_adjusted = _color_match_edges(frame, background, mask_smooth)
978
-
979
- # Create 3-channel alpha mask
980
- alpha_3ch = np.stack([mask_smooth] * 3, axis=2)
981
-
982
- # Perform high-quality compositing
983
- frame_float = frame_adjusted.astype(np.float32)
984
- background_float = background.astype(np.float32)
985
-
986
- # Alpha blending with gamma correction
987
- result = frame_float * alpha_3ch + background_float * (1 - alpha_3ch)
988
- result = np.clip(result, 0, 255).astype(np.uint8)
989
-
990
- return result
991
-
992
- except Exception as e:
993
- logger.error(f"Advanced compositing error: {e}")
994
- raise
995
-
996
- def _color_match_edges(frame: np.ndarray, background: np.ndarray, alpha: np.ndarray) -> np.ndarray:
997
- """Subtle color matching at edges to reduce halos"""
998
- try:
999
- # Find edge regions (transition areas)
1000
- edge_mask = cv2.Sobel(alpha, cv2.CV_64F, 1, 1, ksize=3)
1001
- edge_mask = np.abs(edge_mask)
1002
- edge_mask = (edge_mask > 0.1).astype(np.float32)
1003
-
1004
- # Calculate color difference in edge regions
1005
- edge_areas = edge_mask > 0
1006
- if not np.any(edge_areas):
1007
- return frame
1008
-
1009
- # Subtle color adjustment
1010
- frame_adjusted = frame.copy().astype(np.float32)
1011
- background_float = background.astype(np.float32)
1012
-
1013
- # Apply very subtle color shift towards background in edge areas
1014
- adjustment_strength = 0.1
1015
- for c in range(3):
1016
- frame_adjusted[:, :, c] = np.where(
1017
- edge_areas,
1018
- frame_adjusted[:, :, c] * (1 - adjustment_strength) +
1019
- background_float[:, :, c] * adjustment_strength,
1020
- frame_adjusted[:, :, c]
1021
- )
1022
-
1023
- return np.clip(frame_adjusted, 0, 255).astype(np.uint8)
1024
-
1025
- except Exception as e:
1026
- logger.warning(f"Color matching failed: {e}")
1027
- return frame
1028
-
1029
- def _simple_compositing(frame: np.ndarray, mask: np.ndarray, background: np.ndarray) -> np.ndarray:
1030
- """Simple fallback compositing method"""
1031
- try:
1032
- logger.info("Using simple compositing fallback")
1033
-
1034
- # Resize background
1035
- background = cv2.resize(background, (frame.shape[1], frame.shape[0]))
1036
-
1037
- # Process mask
1038
- if len(mask.shape) == 3:
1039
- mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)
1040
- if mask.max() <= 1.0:
1041
- mask = (mask * 255).astype(np.uint8)
1042
-
1043
- # Simple binary threshold
1044
- _, mask_binary = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY)
1045
-
1046
- # Create alpha mask
1047
- mask_norm = mask_binary.astype(np.float32) / 255.0
1048
- mask_3ch = np.stack([mask_norm] * 3, axis=2)
1049
-
1050
- # Simple alpha blending
1051
- result = frame * mask_3ch + background * (1 - mask_3ch)
1052
- return result.astype(np.uint8)
1053
-
1054
- except Exception as e:
1055
- logger.error(f"Simple compositing failed: {e}")
1056
- # Last resort: return original frame
1057
- return frame
1058
-
1059
- def create_professional_background(bg_config: Dict[str, Any], width: int, height: int) -> np.ndarray:
1060
- """Enhanced professional background creation with quality improvements"""
1061
- try:
1062
- if bg_config["type"] == "color":
1063
- background = _create_solid_background(bg_config, width, height)
1064
- elif bg_config["type"] == "gradient":
1065
- background = _create_gradient_background_enhanced(bg_config, width, height)
1066
- else:
1067
- # Fallback to neutral gray
1068
- background = np.full((height, width, 3), (128, 128, 128), dtype=np.uint8)
1069
-
1070
- # Apply brightness and contrast adjustments
1071
- background = _apply_background_adjustments(background, bg_config)
1072
-
1073
- return background
1074
-
1075
- except Exception as e:
1076
- logger.error(f"Background creation error: {e}")
1077
- return np.full((height, width, 3), (128, 128, 128), dtype=np.uint8)
1078
-
1079
- def _create_solid_background(bg_config: Dict[str, Any], width: int, height: int) -> np.ndarray:
1080
- """Create solid color background"""
1081
- color_hex = bg_config["colors"][0].lstrip('#')
1082
- color_rgb = tuple(int(color_hex[i:i+2], 16) for i in (0, 2, 4))
1083
- color_bgr = color_rgb[::-1]
1084
- return np.full((height, width, 3), color_bgr, dtype=np.uint8)
1085
-
1086
- def _create_gradient_background_enhanced(bg_config: Dict[str, Any], width: int, height: int) -> np.ndarray:
1087
- """Create enhanced gradient background with better quality"""
1088
- try:
1089
- colors = bg_config["colors"]
1090
- direction = bg_config.get("direction", "vertical")
1091
-
1092
- # Convert hex to RGB
1093
- rgb_colors = []
1094
- for color_hex in colors:
1095
- color_hex = color_hex.lstrip('#')
1096
- rgb = tuple(int(color_hex[i:i+2], 16) for i in (0, 2, 4))
1097
- rgb_colors.append(rgb)
1098
-
1099
- if not rgb_colors:
1100
- rgb_colors = [(128, 128, 128)]
1101
-
1102
- # Use NumPy for better performance on large images
1103
- if direction == "vertical":
1104
- background = _create_vertical_gradient(rgb_colors, width, height)
1105
- elif direction == "horizontal":
1106
- background = _create_horizontal_gradient(rgb_colors, width, height)
1107
- elif direction == "diagonal":
1108
- background = _create_diagonal_gradient(rgb_colors, width, height)
1109
- elif direction in ["radial", "soft_radial"]:
1110
- background = _create_radial_gradient(rgb_colors, width, height, direction == "soft_radial")
1111
- else:
1112
- background = _create_vertical_gradient(rgb_colors, width, height)
1113
-
1114
- return cv2.cvtColor(background, cv2.COLOR_RGB2BGR)
1115
-
1116
- except Exception as e:
1117
- logger.error(f"Gradient creation error: {e}")
1118
- return np.full((height, width, 3), (128, 128, 128), dtype=np.uint8)
1119
-
1120
- def _create_vertical_gradient(colors: list, width: int, height: int) -> np.ndarray:
1121
- """Create vertical gradient using NumPy for performance"""
1122
- gradient = np.zeros((height, width, 3), dtype=np.uint8)
1123
-
1124
- for y in range(height):
1125
- progress = y / height if height > 0 else 0
1126
- color = _interpolate_color(colors, progress)
1127
- gradient[y, :] = color
1128
-
1129
- return gradient
1130
-
1131
- def _create_horizontal_gradient(colors: list, width: int, height: int) -> np.ndarray:
1132
- """Create horizontal gradient using NumPy for performance"""
1133
- gradient = np.zeros((height, width, 3), dtype=np.uint8)
1134
-
1135
- for x in range(width):
1136
- progress = x / width if width > 0 else 0
1137
- color = _interpolate_color(colors, progress)
1138
- gradient[:, x] = color
1139
-
1140
- return gradient
1141
-
1142
- def _create_diagonal_gradient(colors: list, width: int, height: int) -> np.ndarray:
1143
- """Create diagonal gradient using vectorized operations"""
1144
- y_coords, x_coords = np.mgrid[0:height, 0:width]
1145
- max_distance = width + height
1146
- progress = (x_coords + y_coords) / max_distance
1147
- progress = np.clip(progress, 0, 1)
1148
-
1149
- # Vectorized color interpolation
1150
- gradient = np.zeros((height, width, 3), dtype=np.uint8)
1151
- for c in range(3):
1152
- gradient[:, :, c] = _vectorized_color_interpolation(colors, progress, c)
1153
-
1154
- return gradient
1155
-
1156
- def _create_radial_gradient(colors: list, width: int, height: int, soft: bool = False) -> np.ndarray:
1157
- """Create radial gradient using vectorized operations"""
1158
- center_x, center_y = width // 2, height // 2
1159
- max_distance = np.sqrt(center_x**2 + center_y**2)
1160
-
1161
- y_coords, x_coords = np.mgrid[0:height, 0:width]
1162
- distances = np.sqrt((x_coords - center_x)**2 + (y_coords - center_y)**2)
1163
- progress = distances / max_distance
1164
- progress = np.clip(progress, 0, 1)
1165
-
1166
- if soft:
1167
- progress = np.power(progress, 0.7)
1168
-
1169
- # Vectorized color interpolation
1170
- gradient = np.zeros((height, width, 3), dtype=np.uint8)
1171
- for c in range(3):
1172
- gradient[:, :, c] = _vectorized_color_interpolation(colors, progress, c)
1173
-
1174
- return gradient
1175
-
1176
- def _vectorized_color_interpolation(colors: list, progress: np.ndarray, channel: int) -> np.ndarray:
1177
- """Vectorized color interpolation for performance"""
1178
- if len(colors) == 1:
1179
- return np.full_like(progress, colors[0][channel], dtype=np.uint8)
1180
-
1181
- num_segments = len(colors) - 1
1182
- segment_progress = progress * num_segments
1183
- segment_indices = np.floor(segment_progress).astype(int)
1184
- segment_indices = np.clip(segment_indices, 0, num_segments - 1)
1185
- local_progress = segment_progress - segment_indices
1186
-
1187
- # Get start and end colors for each pixel
1188
- start_colors = np.array([colors[i][channel] for i in range(len(colors))])
1189
- end_colors = np.array([colors[min(i + 1, len(colors) - 1)][channel] for i in range(len(colors))])
1190
-
1191
- start_vals = start_colors[segment_indices]
1192
- end_vals = end_colors[segment_indices]
1193
-
1194
- result = start_vals + (end_vals - start_vals) * local_progress
1195
- return np.clip(result, 0, 255).astype(np.uint8)
1196
-
1197
- def _interpolate_color(colors: list, progress: float) -> tuple:
1198
- """Interpolate between multiple colors"""
1199
- if len(colors) == 1:
1200
- return colors[0]
1201
- elif len(colors) == 2:
1202
- r = int(colors[0][0] + (colors[1][0] - colors[0][0]) * progress)
1203
- g = int(colors[0][1] + (colors[1][1] - colors[0][1]) * progress)
1204
- b = int(colors[0][2] + (colors[1][2] - colors[0][2]) * progress)
1205
- return (r, g, b)
1206
- else:
1207
- segment = progress * (len(colors) - 1)
1208
- idx = int(segment)
1209
- local_progress = segment - idx
1210
- if idx >= len(colors) - 1:
1211
- return colors[-1]
1212
- c1, c2 = colors[idx], colors[idx + 1]
1213
- r = int(c1[0] + (c2[0] - c1[0]) * local_progress)
1214
- g = int(c1[1] + (c2[1] - c1[1]) * local_progress)
1215
- b = int(c1[2] + (c2[2] - c1[2]) * local_progress)
1216
- return (r, g, b)
1217
-
1218
- def _apply_background_adjustments(background: np.ndarray, bg_config: Dict[str, Any]) -> np.ndarray:
1219
- """Apply brightness and contrast adjustments to background"""
1220
- try:
1221
- brightness = bg_config.get("brightness", 1.0)
1222
- contrast = bg_config.get("contrast", 1.0)
1223
-
1224
- if brightness != 1.0 or contrast != 1.0:
1225
- background = background.astype(np.float32)
1226
- background = background * contrast * brightness
1227
- background = np.clip(background, 0, 255).astype(np.uint8)
1228
-
1229
- return background
1230
-
1231
- except Exception as e:
1232
- logger.warning(f"Background adjustment failed: {e}")
1233
- return background
1234
-
1235
- def validate_video_file(video_path: str) -> Tuple[bool, str]:
1236
- """Enhanced video file validation with detailed checks"""
1237
- if not video_path or not os.path.exists(video_path):
1238
- return False, "Video file not found"
1239
-
1240
- try:
1241
- # Check file size
1242
- file_size = os.path.getsize(video_path)
1243
- if file_size == 0:
1244
- return False, "Video file is empty"
1245
-
1246
- if file_size > 2 * 1024 * 1024 * 1024: # 2GB limit
1247
- return False, "Video file too large (>2GB)"
1248
-
1249
- # Check with OpenCV
1250
- cap = cv2.VideoCapture(video_path)
1251
- if not cap.isOpened():
1252
- return False, "Cannot open video file"
1253
-
1254
- frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
1255
- fps = cap.get(cv2.CAP_PROP_FPS)
1256
- width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
1257
- height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
1258
-
1259
- cap.release()
1260
-
1261
- # Validation checks
1262
- if frame_count == 0:
1263
- return False, "Video appears to be empty (0 frames)"
1264
-
1265
- if fps <= 0 or fps > 120:
1266
- return False, f"Invalid frame rate: {fps}"
1267
-
1268
- if width <= 0 or height <= 0:
1269
- return False, f"Invalid resolution: {width}x{height}"
1270
-
1271
- if width > 4096 or height > 4096:
1272
- return False, f"Resolution too high: {width}x{height} (max 4096x4096)"
1273
-
1274
- duration = frame_count / fps
1275
- if duration > 300: # 5 minutes
1276
- return False, f"Video too long: {duration:.1f}s (max 300s)"
1277
-
1278
- return True, f"Valid video: {width}x{height}, {fps:.1f}fps, {duration:.1f}s"
1279
-
1280
- except Exception as e:
1281
- return False, f"Error validating video: {str(e)}"