MogensR commited on
Commit
0a08e1e
·
1 Parent(s): 8168272

Update two_stage_processor.py

Browse files
Files changed (1) hide show
  1. two_stage_processor.py +120 -94
two_stage_processor.py CHANGED
@@ -12,6 +12,8 @@
12
  import logging
13
  from pathlib import Path
14
  import tempfile
 
 
15
 
16
  logger = logging.getLogger(__name__)
17
 
@@ -38,6 +40,7 @@ def _prog(pct: float, desc: str):
38
 
39
  cap = cv2.VideoCapture(video_path)
40
  if not cap.isOpened():
 
41
  return None, "Could not open video file"
42
 
43
  fps = cap.get(cv2.CAP_PROP_FPS)
@@ -49,9 +52,14 @@ def _prog(pct: float, desc: str):
49
  green_bg = np.zeros((height, width, 3), dtype=np.uint8)
50
  green_bg[:, :] = [0, 255, 0] # Pure green in BGR
51
 
52
- # Setup output
53
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
54
- out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
 
 
 
 
 
55
 
56
  # Storage for masks (for potential reuse)
57
  masks = []
@@ -87,6 +95,7 @@ def _prog(pct: float, desc: str):
87
  try:
88
  with open(mask_file, 'wb') as f:
89
  pickle.dump(masks, f)
 
90
  except Exception as e:
91
  logger.warning(f"Failed to save masks: {e}")
92
 
@@ -94,7 +103,7 @@ def _prog(pct: float, desc: str):
94
  return output_path, f"Green screen created: {frame_count} frames"
95
 
96
  except Exception as e:
97
- logger.error(f"Stage 1 error: {e}")
98
  return None, f"Stage 1 failed: {str(e)}"
99
 
100
  def stage2_greenscreen_to_final(self, greenscreen_path, background, output_path,
@@ -107,18 +116,14 @@ def _prog(pct: float, desc: str):
107
  progress_callback(pct, desc)
108
 
109
  if chroma_settings is None:
110
- chroma_settings = {
111
- 'key_color': [0, 255, 0], # Green in BGR
112
- 'tolerance': 40,
113
- 'edge_softness': 2,
114
- 'spill_suppression': 0.3
115
- }
116
 
117
  try:
118
  _prog(0.0, "Stage 2: Applying final background...")
119
 
120
  cap = cv2.VideoCapture(greenscreen_path)
121
  if not cap.isOpened():
 
122
  return None, "Could not open green screen video"
123
 
124
  fps = cap.get(cv2.CAP_PROP_FPS)
@@ -130,15 +135,21 @@ def _prog(pct: float, desc: str):
130
  if isinstance(background, str):
131
  bg = cv2.imread(background)
132
  if bg is None:
 
133
  return None, "Could not load background image"
134
  else:
135
  bg = background
136
 
137
  bg = cv2.resize(bg, (width, height))
138
 
139
- # Setup output
140
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
141
- out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
 
 
 
 
 
142
 
143
  frame_count = 0
144
 
@@ -163,13 +174,13 @@ def _prog(pct: float, desc: str):
163
  return output_path, f"Final video created: {frame_count} frames"
164
 
165
  except Exception as e:
166
- logger.error(f"Stage 2 error: {e}")
167
  return None, f"Stage 2 failed: {str(e)}"
168
 
169
  def _extract_person_mask(self, frame):
170
  """Extract person mask using SAM2"""
171
  if self.sam2_predictor is None:
172
- # Fallback mask
173
  h, w = frame.shape[:2]
174
  mask = np.zeros((h, w), dtype=np.uint8)
175
  mask[h//6:5*h//6, w//4:3*w//4] = 255
@@ -204,7 +215,7 @@ def _extract_person_mask(self, frame):
204
  return mask
205
 
206
  except Exception as e:
207
- logger.error(f"Mask extraction error: {e}")
208
  h, w = frame.shape[:2]
209
  mask = np.zeros((h, w), dtype=np.uint8)
210
  mask[h//6:5*h//6, w//4:3*w//4] = 255
@@ -217,96 +228,111 @@ def _refine_mask(self, frame, mask):
217
  return mask
218
 
219
  try:
220
- # MatAnyone refinement logic here
221
- # This would depend on your MatAnyone implementation
222
- return mask
223
- except:
224
- logger.warning("MatAnyone refinement failed, using original mask")
225
  return mask
226
 
227
  def _apply_greenscreen_hard(self, frame, mask, green_bg):
228
  """Apply green screen with hard edges for clean chroma keying"""
229
- # Binary threshold for clean edges
230
- _, mask_binary = cv2.threshold(mask, 140, 255, cv2.THRESH_BINARY)
231
-
232
- # No feathering - we want hard edges for chroma keying
233
- mask_3ch = cv2.cvtColor(mask_binary, cv2.COLOR_GRAY2BGR)
234
- mask_norm = mask_3ch.astype(float) / 255
235
-
236
- # Composite
237
- result = frame * mask_norm + green_bg * (1 - mask_norm)
238
- return result.astype(np.uint8)
 
 
 
 
239
 
240
  def _chroma_key_advanced(self, frame, background, settings):
241
- """
242
- Advanced chroma keying with spill suppression
243
- """
244
- key_color = np.array(settings['key_color'], dtype=np.uint8)
245
- tolerance = settings['tolerance']
246
- softness = settings['edge_softness']
247
- spill_suppress = settings['spill_suppression']
248
-
249
- # Convert to float for processing
250
- frame_float = frame.astype(np.float32)
251
- bg_float = background.astype(np.float32)
252
-
253
- # Calculate color distance from key color
254
- diff = np.abs(frame_float - key_color)
255
- distance = np.sqrt(np.sum(diff ** 2, axis=2))
256
-
257
- # Create mask based on distance
258
- mask = np.where(distance < tolerance, 0, 1)
259
-
260
- # Edge softening
261
- if softness > 0:
262
- mask = cv2.GaussianBlur(mask.astype(np.float32),
263
- (softness*2+1, softness*2+1),
264
- softness)
265
-
266
- # Spill suppression - reduce green in edges
267
- if spill_suppress > 0:
268
- green_channel = frame_float[:, :, 1]
269
- spill_mask = np.where(mask < 1, 1 - mask, 0)
270
- green_suppression = green_channel * spill_mask * spill_suppress
271
- frame_float[:, :, 1] -= green_suppression
272
- frame_float = np.clip(frame_float, 0, 255)
273
-
274
- # Expand mask to 3 channels
275
- mask_3ch = np.stack([mask] * 3, axis=2)
276
-
277
- # Composite
278
- result = frame_float * mask_3ch + bg_float * (1 - mask_3ch)
279
- return np.clip(result, 0, 255).astype(np.uint8)
 
 
280
 
281
  def process_full_pipeline(self, video_path, background, final_output,
282
  chroma_settings=None, progress_callback=None):
283
  """
284
  Run the complete two-stage pipeline
285
  """
286
- # Stage 1: Create green screen
287
- greenscreen_path = tempfile.mktemp(suffix='_greenscreen.mp4')
288
- gs_result, gs_msg = self.stage1_extract_to_greenscreen(
289
- video_path, greenscreen_path, progress_callback
290
- )
291
-
292
- if gs_result is None:
293
- return None, gs_msg
294
-
295
- # Stage 2: Apply final background
296
- final_result, final_msg = self.stage2_greenscreen_to_final(
297
- greenscreen_path, background, final_output,
298
- chroma_settings, progress_callback
299
- )
300
-
301
- # Cleanup
302
  try:
303
- os.remove(greenscreen_path)
304
- except:
305
- pass
306
-
307
- return final_result, final_msg
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
308
 
309
- # Chroma key settings presets
310
  CHROMA_PRESETS = {
311
  'standard': {
312
  'key_color': [0, 255, 0],
@@ -314,13 +340,13 @@ def process_full_pipeline(self, video_path, background, final_output,
314
  'edge_softness': 2,
315
  'spill_suppression': 0.3
316
  },
317
- 'tight': {
318
  'key_color': [0, 255, 0],
319
  'tolerance': 30,
320
  'edge_softness': 1,
321
  'spill_suppression': 0.4
322
  },
323
- 'soft': {
324
  'key_color': [0, 255, 0],
325
  'tolerance': 50,
326
  'edge_softness': 3,
 
12
  import logging
13
  from pathlib import Path
14
  import tempfile
15
+ import traceback
16
+ from utilities import refine_mask_hq # Import for MatAnyone refinement
17
 
18
  logger = logging.getLogger(__name__)
19
 
 
40
 
41
  cap = cv2.VideoCapture(video_path)
42
  if not cap.isOpened():
43
+ logger.error("Could not open video file")
44
  return None, "Could not open video file"
45
 
46
  fps = cap.get(cv2.CAP_PROP_FPS)
 
52
  green_bg = np.zeros((height, width, 3), dtype=np.uint8)
53
  green_bg[:, :] = [0, 255, 0] # Pure green in BGR
54
 
55
+ # Setup output using app.py's create_video_writer
56
+ from app import create_video_writer # Import here to avoid circular imports
57
+ out, actual_output_path = create_video_writer(output_path, fps, width, height)
58
+ if out is None:
59
+ cap.release()
60
+ logger.error("Could not create output video file")
61
+ return None, "Could not create output video file"
62
+ output_path = actual_output_path
63
 
64
  # Storage for masks (for potential reuse)
65
  masks = []
 
95
  try:
96
  with open(mask_file, 'wb') as f:
97
  pickle.dump(masks, f)
98
+ logger.info(f"Masks saved to {mask_file}")
99
  except Exception as e:
100
  logger.warning(f"Failed to save masks: {e}")
101
 
 
103
  return output_path, f"Green screen created: {frame_count} frames"
104
 
105
  except Exception as e:
106
+ logger.error(f"Stage 1 error: {e}\n{traceback.format_exc()}")
107
  return None, f"Stage 1 failed: {str(e)}"
108
 
109
  def stage2_greenscreen_to_final(self, greenscreen_path, background, output_path,
 
116
  progress_callback(pct, desc)
117
 
118
  if chroma_settings is None:
119
+ chroma_settings = CHROMA_PRESETS['standard']
 
 
 
 
 
120
 
121
  try:
122
  _prog(0.0, "Stage 2: Applying final background...")
123
 
124
  cap = cv2.VideoCapture(greenscreen_path)
125
  if not cap.isOpened():
126
+ logger.error("Could not open green screen video")
127
  return None, "Could not open green screen video"
128
 
129
  fps = cap.get(cv2.CAP_PROP_FPS)
 
135
  if isinstance(background, str):
136
  bg = cv2.imread(background)
137
  if bg is None:
138
+ logger.error("Could not load background image")
139
  return None, "Could not load background image"
140
  else:
141
  bg = background
142
 
143
  bg = cv2.resize(bg, (width, height))
144
 
145
+ # Setup output using app.py's create_video_writer
146
+ from app import create_video_writer
147
+ out, actual_output_path = create_video_writer(output_path, fps, width, height)
148
+ if out is None:
149
+ cap.release()
150
+ logger.error("Could not create output video file")
151
+ return None, "Could not create output video file"
152
+ output_path = actual_output_path
153
 
154
  frame_count = 0
155
 
 
174
  return output_path, f"Final video created: {frame_count} frames"
175
 
176
  except Exception as e:
177
+ logger.error(f"Stage 2 error: {e}\n{traceback.format_exc()}")
178
  return None, f"Stage 2 failed: {str(e)}"
179
 
180
  def _extract_person_mask(self, frame):
181
  """Extract person mask using SAM2"""
182
  if self.sam2_predictor is None:
183
+ logger.warning("SAM2 predictor not available, using fallback mask")
184
  h, w = frame.shape[:2]
185
  mask = np.zeros((h, w), dtype=np.uint8)
186
  mask[h//6:5*h//6, w//4:3*w//4] = 255
 
215
  return mask
216
 
217
  except Exception as e:
218
+ logger.error(f"Mask extraction error: {e}\n{traceback.format_exc()}")
219
  h, w = frame.shape[:2]
220
  mask = np.zeros((h, w), dtype=np.uint8)
221
  mask[h//6:5*h//6, w//4:3*w//4] = 255
 
228
  return mask
229
 
230
  try:
231
+ refined_mask = refine_mask_hq(frame, mask, self.matanyone_model)
232
+ logger.info("MatAnyone mask refinement successful")
233
+ return refined_mask
234
+ except Exception as e:
235
+ logger.warning(f"MatAnyone refinement failed: {e}\n{traceback.format_exc()}")
236
  return mask
237
 
238
  def _apply_greenscreen_hard(self, frame, mask, green_bg):
239
  """Apply green screen with hard edges for clean chroma keying"""
240
+ try:
241
+ # Binary threshold for clean edges
242
+ _, mask_binary = cv2.threshold(mask, 140, 255, cv2.THRESH_BINARY)
243
+
244
+ # No feathering - we want hard edges for chroma keying
245
+ mask_3ch = cv2.cvtColor(mask_binary, cv2.COLOR_GRAY2BGR)
246
+ mask_norm = mask_3ch.astype(float) / 255
247
+
248
+ # Composite
249
+ result = frame * mask_norm + green_bg * (1 - mask_norm)
250
+ return result.astype(np.uint8)
251
+ except Exception as e:
252
+ logger.error(f"Greenscreen application error: {e}\n{traceback.format_exc()}")
253
+ return frame
254
 
255
  def _chroma_key_advanced(self, frame, background, settings):
256
+ """Advanced chroma keying with spill suppression"""
257
+ try:
258
+ key_color = np.array(settings['key_color'], dtype=np.uint8)
259
+ tolerance = settings['tolerance']
260
+ softness = settings['edge_softness']
261
+ spill_suppress = settings['spill_suppression']
262
+
263
+ # Convert to float for processing
264
+ frame_float = frame.astype(np.float32)
265
+ bg_float = background.astype(np.float32)
266
+
267
+ # Calculate color distance from key color
268
+ diff = np.abs(frame_float - key_color)
269
+ distance = np.sqrt(np.sum(diff ** 2, axis=2))
270
+
271
+ # Create mask based on distance
272
+ mask = np.where(distance < tolerance, 0, 1)
273
+
274
+ # Edge softening
275
+ if softness > 0:
276
+ mask = cv2.GaussianBlur(mask.astype(np.float32),
277
+ (softness*2+1, softness*2+1),
278
+ softness)
279
+
280
+ # Spill suppression - reduce green in edges
281
+ if spill_suppress > 0:
282
+ green_channel = frame_float[:, :, 1]
283
+ spill_mask = np.where(mask < 1, 1 - mask, 0)
284
+ green_suppression = green_channel * spill_mask * spill_suppress
285
+ frame_float[:, :, 1] -= green_suppression
286
+ frame_float = np.clip(frame_float, 0, 255)
287
+
288
+ # Expand mask to 3 channels
289
+ mask_3ch = np.stack([mask] * 3, axis=2)
290
+
291
+ # Composite
292
+ result = frame_float * mask_3ch + bg_float * (1 - mask_3ch)
293
+ return np.clip(result, 0, 255).astype(np.uint8)
294
+ except Exception as e:
295
+ logger.error(f"Chroma keying error: {e}\n{traceback.format_exc()}")
296
+ return frame
297
 
298
  def process_full_pipeline(self, video_path, background, final_output,
299
  chroma_settings=None, progress_callback=None):
300
  """
301
  Run the complete two-stage pipeline
302
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
303
  try:
304
+ # Stage 1: Create green screen
305
+ greenscreen_path = tempfile.mktemp(suffix='_greenscreen.mp4')
306
+ gs_result, gs_msg = self.stage1_extract_to_greenscreen(
307
+ video_path, greenscreen_path, progress_callback
308
+ )
309
+
310
+ if gs_result is None:
311
+ logger.error(f"Stage 1 failed: {gs_msg}")
312
+ return None, gs_msg
313
+
314
+ # Stage 2: Apply final background
315
+ final_result, final_msg = self.stage2_greenscreen_to_final(
316
+ greenscreen_path, background, final_output,
317
+ chroma_settings, progress_callback
318
+ )
319
+
320
+ # Cleanup
321
+ try:
322
+ os.remove(greenscreen_path)
323
+ except Exception as e:
324
+ logger.warning(f"Failed to clean up greenscreen file: {e}")
325
+
326
+ if final_result is None:
327
+ logger.error(f"Stage 2 failed: {final_msg}")
328
+ return None, final_msg
329
+
330
+ return final_result, final_msg
331
+ except Exception as e:
332
+ logger.error(f"Full pipeline error: {e}\n{traceback.format_exc()}")
333
+ return None, f"Full pipeline failed: {str(e)}"
334
 
335
+ # Chroma key settings presets (aligned with app.py and ui_components.py)
336
  CHROMA_PRESETS = {
337
  'standard': {
338
  'key_color': [0, 255, 0],
 
340
  'edge_softness': 2,
341
  'spill_suppression': 0.3
342
  },
343
+ 'studio': {
344
  'key_color': [0, 255, 0],
345
  'tolerance': 30,
346
  'edge_softness': 1,
347
  'spill_suppression': 0.4
348
  },
349
+ 'outdoor': {
350
  'key_color': [0, 255, 0],
351
  'tolerance': 50,
352
  'edge_softness': 3,