MogensR commited on
Commit
157d197
·
1 Parent(s): 32853be

Update utils/__init__.py

Browse files
Files changed (1) hide show
  1. utils/__init__.py +159 -570
utils/__init__.py CHANGED
@@ -1,575 +1,164 @@
1
  #!/usr/bin/env python3
2
  """
3
- BackgroundFX Pro - CSP-Safe Application Entry Point
4
- Now with: live background preview + sources: Preset / Upload / Gradient / AI Generate
5
- - Lazy-loaded Diffusers pipeline (VRAM-aware: sd-turbo / sdxl-turbo / sd-2.1 CPU)
6
- - Preview shows the exact background used
7
- - Clears stale AI image when switching sources
8
  """
9
 
10
- import early_env # <<< must be FIRST
11
-
12
- import os, time, math
13
- from typing import Optional, Dict, Any, Callable, Tuple
14
-
15
- # Prefer a writable cache in constrained environments (e.g., HF Spaces)
16
- os.environ.setdefault("HF_HOME", "/tmp/hf")
17
- os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1")
18
-
19
- # 1) CSP-safe Gradio env
20
- os.environ['GRADIO_ALLOW_FLAGGING'] = 'never'
21
- os.environ['GRADIO_ANALYTICS_ENABLED'] = 'False'
22
- os.environ['GRADIO_SERVER_NAME'] = '0.0.0.0'
23
- os.environ['GRADIO_SERVER_PORT'] = '7860'
24
-
25
- # 2) Gradio schema patch
26
- try:
27
- import gradio_client.utils as gc_utils
28
- _orig_get_type = gc_utils.get_type
29
- def _patched_get_type(schema):
30
- if not isinstance(schema, dict):
31
- if isinstance(schema, bool): return "boolean"
32
- if isinstance(schema, str): return "string"
33
- if isinstance(schema, (int, float)): return "number"
34
- return "string"
35
- return _orig_get_type(schema)
36
- gc_utils.get_type = _patched_get_type
37
- except Exception:
38
- pass
39
-
40
- # 3) Logging early
41
- from utils.logging_setup import setup_logging, make_logger
42
- setup_logging(app_name="backgroundfx")
43
- logger = make_logger("entrypoint")
44
- logger.info("Entrypoint starting…")
45
-
46
- # 4) Imports
47
- from core.exceptions import ModelLoadingError, VideoProcessingError
48
- from config.app_config import get_config
49
- from utils.hardware.device_manager import DeviceManager
50
- from utils.system.memory_manager import MemoryManager
51
- from models.loaders.model_loader import ModelLoader
52
- from processing.video.video_processor import CoreVideoProcessor, ProcessorConfig
53
- from processing.audio.audio_processor import AudioProcessor
54
-
55
- # Background helpers (kept lightweight to avoid cycles)
56
- from utils import PROFESSIONAL_BACKGROUNDS, validate_video_file, create_professional_background
57
- # Gradient helper (add to utils; fallback here for preview only if missing)
58
- try:
59
- from utils import create_gradient_background
60
- except Exception:
61
- def create_gradient_background(spec: Dict[str, Any], width: int, height: int):
62
- # Lightweight fallback (linear+rotate only)
63
- import numpy as np
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
  import cv2
65
- def _to_rgb(c):
66
- if isinstance(c, (list, tuple)) and len(c) == 3:
67
- return tuple(int(x) for x in c)
68
- if isinstance(c, str) and c.startswith("#") and len(c) == 7:
69
- return tuple(int(c[i:i+2], 16) for i in (1,3,5))
70
- return (255, 255, 255)
71
- start = _to_rgb(spec.get("start", "#222222"))
72
- end = _to_rgb(spec.get("end", "#888888"))
73
- angle = float(spec.get("angle_deg", 0))
74
- bg = np.zeros((height, width, 3), np.uint8)
75
- for y in range(height):
76
- t = y / max(1, height - 1)
77
- r = int(start[0] * (1 - t) + end[0] * t)
78
- g = int(start[1] * (1 - t) + end[1] * t)
79
- b = int(start[2] * (1 - t) + end[2] * t)
80
- bg[y, :] = (r, g, b)
81
- center = (width / 2, height / 2)
82
- rot = cv2.getRotationMatrix2D(center, angle, 1.0)
83
- return cv2.warpAffine(bg, rot, (width, height), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT_101)
84
-
85
- # 5) CSP-safe fallbacks for models
86
- class CSPSafeSAM2:
87
- def set_image(self, image):
88
- self.shape = getattr(image, 'shape', (512, 512, 3))
89
- def predict(self, point_coords=None, point_labels=None, box=None, multimask_output=True, **kwargs):
90
- import numpy as np
91
- h, w = self.shape[:2] if hasattr(self, 'shape') else (512, 512)
92
- n = 3 if multimask_output else 1
93
- return np.ones((n, h, w), dtype=bool), np.array([0.9, 0.8, 0.7][:n]), np.ones((n, h, w), dtype=np.float32)
94
-
95
- class CSPSafeMatAnyone:
96
- def step(self, image_tensor, mask_tensor=None, objects=None, first_frame_pred=False, **kwargs):
97
- import torch
98
- if hasattr(image_tensor, "shape"):
99
- if len(image_tensor.shape) == 3:
100
- _, H, W = image_tensor.shape
101
- elif len(image_tensor.shape) == 4:
102
- _, _, H, W = image_tensor.shape
103
- else:
104
- H, W = 256, 256
105
- else:
106
- H, W = 256, 256
107
- return torch.ones((1, 1, H, W))
108
- def output_prob_to_mask(self, output_prob):
109
- return (output_prob > 0.5).float()
110
- def process(self, image, mask, **kwargs):
111
- return mask
112
-
113
- # ---------- helpers for UI ----------
114
- import numpy as np
115
- import cv2
116
- from PIL import Image
117
- from typing import Tuple
118
-
119
- PREVIEW_W, PREVIEW_H = 640, 360 # 16:9
120
-
121
- def _hex_to_rgb(x: str) -> Tuple[int, int, int]:
122
- x = (x or "").strip()
123
- if x.startswith("#") and len(x) == 7:
124
- return tuple(int(x[i:i+2], 16) for i in (1, 3, 5))
125
- return (255, 255, 255)
126
-
127
- def _np_to_pil(arr: np.ndarray) -> Image.Image:
128
- if arr.dtype != np.uint8:
129
- arr = arr.clip(0, 255).astype(np.uint8)
130
- return Image.fromarray(arr)
131
-
132
- def _div8(n: int) -> int:
133
- # Ensure sizes are multiples of 8 for SD/VAEs (min 256)
134
- n = int(n)
135
- if n < 256: n = 256
136
- return int(math.floor(n / 8.0) * 8)
137
-
138
- # ---------- main app ----------
139
- class VideoBackgroundApp:
140
- def __init__(self):
141
- self.config = get_config()
142
- self.device_mgr = DeviceManager()
143
- self.memory_mgr = MemoryManager(self.device_mgr.get_optimal_device())
144
- self.model_loader = ModelLoader(self.device_mgr, self.memory_mgr)
145
- self.audio_proc = AudioProcessor()
146
- self.models_loaded = False
147
- self.core_processor: Optional[CoreVideoProcessor] = None
148
- # Text-to-image cache
149
- self.t2i_pipe = None
150
- self.t2i_model_id = None
151
- logger.info("VideoBackgroundApp initialized (device=%s)", self.device_mgr.get_optimal_device())
152
-
153
- def load_models(self, progress_callback: Optional[Callable] = None) -> str:
154
- logger.info("Loading models (CSP-safe)…")
155
- try:
156
- sam2, matanyone = self.model_loader.load_all_models(progress_callback=progress_callback)
157
- except Exception as e:
158
- logger.warning("Model loading failed (%s) - Using CSP-safe fallbacks", e)
159
- sam2, matanyone = None, None
160
-
161
- sam2_model = getattr(sam2, "model", sam2) if sam2 else CSPSafeSAM2()
162
- matanyone_model = getattr(matanyone, "model", matanyone) if matanyone else CSPSafeMatAnyone()
163
-
164
- cfg = ProcessorConfig(
165
- background_preset="office",
166
- write_fps=None,
167
- max_model_size=1280,
168
- use_nvenc=True,
169
- nvenc_codec="h264",
170
- nvenc_preset="p5",
171
- nvenc_cq=18,
172
- nvenc_tune_hq=True,
173
- nvenc_pix_fmt="yuv420p",
174
- )
175
- self.core_processor = CoreVideoProcessor(config=cfg, models=None)
176
- self.core_processor.models = type('FakeModelManager', (), {
177
- 'get_sam2': lambda self_: sam2_model,
178
- 'get_matanyone': lambda self_: matanyone_model
179
- })()
180
-
181
- self.models_loaded = True
182
- logger.info("Models ready (SAM2=%s, MatAnyOne=%s)",
183
- type(sam2_model).__name__, type(matanyone_model).__name__)
184
- return "Models loaded (CSP-safe; fallbacks in use if actual AI models failed)."
185
-
186
- # ---- PREVIEWS ----
187
- def preview_preset(self, preset_key: str) -> Image.Image:
188
- key = preset_key if preset_key in PROFESSIONAL_BACKGROUNDS else "office"
189
- bg = create_professional_background(key, PREVIEW_W, PREVIEW_H) # RGB
190
- return _np_to_pil(bg)
191
-
192
- def preview_upload(self, file) -> Optional[Image.Image]:
193
- if file is None:
194
- return None
195
- try:
196
- img = Image.open(file.name).convert("RGB")
197
- img = img.resize((PREVIEW_W, PREVIEW_H), Image.LANCZOS)
198
- return img
199
- except Exception as e:
200
- logger.warning("Upload preview failed: %s", e)
201
- return None
202
-
203
- def preview_gradient(self, gtype: str, color1: str, color2: str, angle: int) -> Image.Image:
204
- spec = {
205
- "type": (gtype or "linear").lower(), # "linear" or "radial" (linear in fallback)
206
- "start": _hex_to_rgb(color1 or "#222222"),
207
- "end": _hex_to_rgb(color2 or "#888888"),
208
- "angle_deg": float(angle or 0),
209
- }
210
- bg = create_gradient_background(spec, PREVIEW_W, PREVIEW_H)
211
- return _np_to_pil(bg)
212
-
213
- # ---- AI BG: lazy-load + reuse pipe ----
214
- def _ensure_t2i(self):
215
- """
216
- Load a text-to-image pipeline once with memory-efficient settings.
217
- Returns (pipe, model_id, msg).
218
- """
219
- if self.t2i_pipe is not None:
220
- return self.t2i_pipe, self.t2i_model_id, "AI generator ready"
221
-
222
- try:
223
- import torch
224
- from diffusers import AutoPipelineForText2Image, StableDiffusionPipeline
225
- except Exception as e:
226
- return None, None, f"AI generation unavailable (missing deps): {e}"
227
-
228
- token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN")
229
- device = "cuda" if getattr(__import__("torch"), "cuda", None) and __import__("torch").cuda.is_available() else "cpu"
230
-
231
- # Try to estimate VRAM to pick a model
232
- vram_gb = None
233
- try:
234
- vram_gb = self.device_mgr.get_device_memory_gb()
235
- except Exception:
236
- pass
237
-
238
- if device == "cuda":
239
- if vram_gb and vram_gb >= 12:
240
- model_id = os.environ.get("BGFX_T2I_MODEL", "stabilityai/sdxl-turbo")
241
- else:
242
- model_id = os.environ.get("BGFX_T2I_MODEL", "stabilityai/sd-turbo")
243
- else:
244
- model_id = os.environ.get("BGFX_T2I_MODEL", "stabilityai/stable-diffusion-2-1")
245
-
246
- logger.info("Loading text-to-image model: %s (device=%s, VRAM=%s GB)", model_id, device, vram_gb)
247
- dtype = __import__("torch").float16 if device == "cuda" else __import__("torch").float32
248
-
249
- pipe = None
250
- try:
251
- # Unified API for turbo/SDXL/SD
252
- pipe = AutoPipelineForText2Image.from_pretrained(
253
- model_id,
254
- torch_dtype=dtype,
255
- use_safetensors=True,
256
- token=token,
257
- )
258
- except Exception as e1:
259
- try:
260
- pipe = StableDiffusionPipeline.from_pretrained(
261
- model_id,
262
- torch_dtype=dtype,
263
- use_safetensors=True,
264
- safety_checker=None,
265
- feature_extractor=None,
266
- use_auth_token=token,
267
- )
268
- except Exception as e2:
269
- return None, None, f"AI model load failed: {e1} / {e2}"
270
-
271
- # Memory/perf knobs
272
- try: pipe.set_progress_bar_config(disable=True)
273
- except Exception: pass
274
- try: pipe.enable_attention_slicing()
275
- except Exception: pass
276
- try: pipe.enable_vae_slicing()
277
- except Exception: pass
278
-
279
- if device == "cuda":
280
- try: pipe.enable_xformers_memory_efficient_attention()
281
- except Exception: pass
282
- pipe = pipe.to(device)
283
- else:
284
- try: pipe.enable_sequential_cpu_offload()
285
- except Exception: pass
286
-
287
- self.t2i_pipe = pipe
288
- self.t2i_model_id = model_id
289
- return pipe, model_id, f"AI model loaded: {model_id}"
290
-
291
- def ai_generate_background(self, prompt: str, seed: int, width: int, height: int) -> Tuple[Optional[Image.Image], Optional[str], str]:
292
- """
293
- Generate a background and save to /tmp. Returns (preview_img, path, status).
294
- """
295
- pipe, model_id, status_msg = self._ensure_t2i()
296
- if pipe is None:
297
- logger.warning(status_msg)
298
- return None, None, status_msg
299
-
300
- # Sizes: multiples of 8, clamped to safe range
301
- w = _div8(width or PREVIEW_W)
302
- h = _div8(height or PREVIEW_H)
303
- w = max(256, min(w, 1536))
304
- h = max(256, min(h, 1536))
305
-
306
- # Prompt defaults aimed at "office-like" backgrounds
307
- prompt = (prompt or "professional modern office background, neutral colors, soft depth of field, clean, minimal, photorealistic")
308
- negative = "text, watermark, logo, people, person, artifact, noisy, blurry"
309
-
310
- try:
311
- import torch
312
- device = "cuda" if getattr(torch, "cuda", None) and torch.cuda.is_available() else "cpu"
313
- try:
314
- g = torch.Generator(device=device).manual_seed(int(seed)) if seed is not None else None
315
- except Exception:
316
- g = None
317
-
318
- steps = 4 if ("turbo" in (self.t2i_model_id or "").lower()) else 25
319
- guidance = 1.0 if ("turbo" in (self.t2i_model_id or "").lower()) else 7.0
320
-
321
- with torch.inference_mode():
322
- if device == "cuda":
323
- with torch.autocast("cuda"):
324
- out = pipe(
325
- prompt=prompt,
326
- negative_prompt=negative,
327
- height=h,
328
- width=w,
329
- guidance_scale=guidance,
330
- num_inference_steps=steps,
331
- generator=g,
332
- )
333
- else:
334
- out = pipe(
335
- prompt=prompt,
336
- negative_prompt=negative,
337
- height=h,
338
- width=w,
339
- guidance_scale=guidance,
340
- num_inference_steps=steps,
341
- generator=g,
342
- )
343
- img = out.images[0]
344
- tmp_path = f"/tmp/ai_bg_{int(time.time())}.png"
345
- img.save(tmp_path)
346
-
347
- return img.resize((PREVIEW_W, PREVIEW_H), Image.LANCZOS), tmp_path, f"{status_msg} • Generated {w}x{h}"
348
- except Exception as e:
349
- logger.exception("AI generation error")
350
- return None, None, f"AI generation failed: {e}"
351
-
352
- # ---- PROCESS VIDEO ----
353
- def process_video(
354
- self,
355
- video: str,
356
- bg_source: str,
357
- preset_key: str,
358
- custom_bg_file,
359
- grad_type: str,
360
- grad_color1: str,
361
- grad_color2: str,
362
- grad_angle: int,
363
- ai_bg_path: Optional[str],
364
- ):
365
- if not self.models_loaded:
366
- return None, "Models not loaded yet"
367
- if not video:
368
- return None, "Please upload a video first."
369
-
370
- logger.info(
371
- "process_video called (video=%s, source=%s, preset=%s, file=%s, grad=%s, ai=%s)",
372
- video,
373
- bg_source,
374
- preset_key,
375
- getattr(custom_bg_file, "name", None) if custom_bg_file else None,
376
- {"type": grad_type, "c1": grad_color1, "c2": grad_color2, "angle": grad_angle},
377
- ai_bg_path,
378
- )
379
-
380
- output_path = f"/tmp/output_{int(time.time())}.mp4"
381
-
382
- # Validate input video
383
- ok = validate_video_file(video)
384
- if not ok:
385
- logger.warning("Invalid/unreadable video: %s", video)
386
- return None, "Invalid or unreadable video file"
387
-
388
- # Build bg_config based on source
389
- src = (bg_source or "Preset").lower()
390
- if src == "upload" and custom_bg_file is not None:
391
- bg_cfg: Dict[str, Any] = {"custom_path": custom_bg_file.name}
392
- elif src == "gradient":
393
- bg_cfg = {
394
- "gradient": {
395
- "type": (grad_type or "linear").lower(),
396
- "start": _hex_to_rgb(grad_color1 or "#222222"),
397
- "end": _hex_to_rgb(grad_color2 or "#888888"),
398
- "angle_deg": float(grad_angle or 0),
399
- }
400
- }
401
- elif src == "ai generate" and ai_bg_path:
402
- bg_cfg = {"custom_path": ai_bg_path}
403
- else:
404
- key = preset_key if preset_key in PROFESSIONAL_BACKGROUNDS else "office"
405
- bg_cfg = {"background_choice": key}
406
-
407
- try:
408
- result = self.core_processor.process_video(
409
- input_path=video,
410
- output_path=output_path,
411
- bg_config=bg_cfg,
412
- )
413
- logger.info("Core processing done → %s", output_path)
414
-
415
- output_with_audio = self.audio_proc.add_audio_to_video(video, output_path)
416
- logger.info("Audio merged → %s", output_with_audio)
417
-
418
- frames = (result.get('frames') if isinstance(result, dict) else None) or "n/a"
419
- return output_with_audio, f"Processing complete ({frames} frames, background={bg_source})"
420
-
421
- except Exception as e:
422
- logger.exception("Processing failed")
423
- return None, f"Processing failed: {e}"
424
-
425
- # 7) Gradio UI
426
- def create_csp_safe_gradio():
427
- import gradio as gr
428
- app = VideoBackgroundApp()
429
-
430
- with gr.Blocks(
431
- title="BackgroundFX Pro - CSP Safe",
432
- analytics_enabled=False,
433
- css="""
434
- .gradio-container { max-width: 1100px; margin: auto; }
435
- """
436
- ) as demo:
437
- gr.Markdown("# 🎬 BackgroundFX Pro (CSP-Safe)")
438
- gr.Markdown("Replace your video background with cinema-quality AI matting. Now with live background preview.")
439
-
440
- with gr.Row():
441
- with gr.Column(scale=1):
442
- video = gr.Video(label="Upload Video")
443
- bg_source = gr.Radio(
444
- ["Preset", "Upload", "Gradient", "AI Generate"],
445
- value="Preset",
446
- label="Background Source",
447
- interactive=True,
448
- )
449
-
450
- # PRESET
451
- preset_choices = list(PROFESSIONAL_BACKGROUNDS.keys())
452
- default_preset = "office" if "office" in preset_choices else (preset_choices[0] if preset_choices else "office")
453
- preset_key = gr.Dropdown(choices=preset_choices, value=default_preset, label="Preset")
454
-
455
- # UPLOAD
456
- custom_bg = gr.File(label="Custom Background (Image)", file_types=["image"], visible=False)
457
-
458
- # GRADIENT
459
- grad_type = gr.Dropdown(choices=["Linear", "Radial"], value="Linear", label="Gradient Type", visible=False)
460
- grad_color1 = gr.ColorPicker(value="#222222", label="Start Color", visible=False)
461
- grad_color2 = gr.ColorPicker(value="#888888", label="End Color", visible=False)
462
- grad_angle = gr.Slider(0, 360, value=0, step=1, label="Angle (degrees)", visible=False)
463
-
464
- # AI
465
- ai_prompt = gr.Textbox(label="AI Prompt", placeholder="e.g., sunlit modern office, soft bokeh, neutral palette", visible=False)
466
- ai_seed = gr.Slider(0, 2**31-1, step=1, value=42, label="Seed", visible=False)
467
- ai_size = gr.Dropdown(choices=["640x360","960x540","1280x720"], value="640x360", label="AI Image Size", visible=False)
468
- ai_go = gr.Button("✨ Generate Background", visible=False, variant="secondary")
469
- ai_status = gr.Markdown(visible=False)
470
- ai_bg_path_state = gr.State(value=None) # store /tmp path
471
-
472
- btn_load = gr.Button("🔄 Load Models", variant="secondary")
473
- btn_run = gr.Button("🎬 Process Video", variant="primary")
474
-
475
- with gr.Column(scale=1):
476
- status = gr.Textbox(label="Status", lines=4)
477
- bg_preview = gr.Image(label="Background Preview", width=PREVIEW_W, height=PREVIEW_H, interactive=False)
478
- out_video = gr.Video(label="Processed Video")
479
-
480
- # ---------- UI wiring ----------
481
-
482
- # background source → show/hide controls
483
- def on_source_toggle(src):
484
- src = (src or "Preset").lower()
485
- return (
486
- gr.update(visible=(src == "preset")),
487
- gr.update(visible=(src == "upload")),
488
- gr.update(visible=(src == "gradient")),
489
- gr.update(visible=(src == "gradient")),
490
- gr.update(visible=(src == "gradient")),
491
- gr.update(visible=(src == "gradient")),
492
- gr.update(visible=(src == "ai generate")),
493
- gr.update(visible=(src == "ai generate")),
494
- gr.update(visible=(src == "ai generate")),
495
- gr.update(visible=(src == "ai generate")),
496
- gr.update(visible=(src == "ai generate")),
497
- )
498
- bg_source.change(
499
- fn=on_source_toggle,
500
- inputs=[bg_source],
501
- outputs=[preset_key, custom_bg, grad_type, grad_color1, grad_color2, grad_angle, ai_prompt, ai_seed, ai_size, ai_go, ai_status],
502
- )
503
-
504
- # ✅ Clear any previous AI image path when switching source (avoids stale AI background)
505
- def _clear_ai_state(_):
506
- return None
507
- bg_source.change(fn=_clear_ai_state, inputs=[bg_source], outputs=[ai_bg_path_state])
508
-
509
- # When source changes, also refresh preview based on visible controls
510
- def on_source_preview(src, pkey, gt, c1, c2, ang):
511
- src_l = (src or "Preset").lower()
512
- if src_l == "preset":
513
- return app.preview_preset(pkey)
514
- elif src_l == "gradient":
515
- return app.preview_gradient(gt, c1, c2, ang)
516
- # For upload/AI we keep whatever the component change handler sets (don’t overwrite)
517
- return gr.update() # no-op
518
- bg_source.change(
519
- fn=on_source_preview,
520
- inputs=[bg_source, preset_key, grad_type, grad_color1, grad_color2, grad_angle],
521
- outputs=[bg_preview]
522
- )
523
-
524
- # live previews
525
- preset_key.change(fn=lambda k: app.preview_preset(k), inputs=[preset_key], outputs=[bg_preview])
526
- custom_bg.change(fn=lambda f: app.preview_upload(f), inputs=[custom_bg], outputs=[bg_preview])
527
- for comp in (grad_type, grad_color1, grad_color2, grad_angle):
528
- comp.change(
529
- fn=lambda gt, c1, c2, ang: app.preview_gradient(gt, c1, c2, ang),
530
- inputs=[grad_type, grad_color1, grad_color2, grad_angle],
531
- outputs=[bg_preview],
532
- )
533
-
534
- # AI generate
535
- def ai_generate(prompt, seed, size):
536
- try:
537
- w, h = map(int, (size or "640x360").split("x"))
538
- except Exception:
539
- w, h = PREVIEW_W, PREVIEW_H
540
- img, path, msg = app.ai_generate_background(
541
- prompt or "professional modern office background, neutral colors, depth of field",
542
- int(seed) if seed is not None else 42,
543
- w, h
544
- )
545
- return img, (path or None), msg
546
- ai_go.click(fn=ai_generate, inputs=[ai_prompt, ai_seed, ai_size], outputs=[bg_preview, ai_bg_path_state, ai_status])
547
-
548
- # model load / run
549
- def safe_load():
550
- msg = app.load_models()
551
- logger.info("UI: models loaded")
552
- return msg, app.preview_preset(preset_key.value if hasattr(preset_key, "value") else "office")
553
- btn_load.click(fn=safe_load, outputs=[status, bg_preview])
554
-
555
- def safe_process(vid, src, pkey, file, gtype, c1, c2, ang, ai_path):
556
- return app.process_video(vid, src, pkey, file, gtype, c1, c2, ang, ai_path)
557
- btn_run.click(
558
- fn=safe_process,
559
- inputs=[video, bg_source, preset_key, custom_bg, grad_type, grad_color1, grad_color2, grad_angle, ai_bg_path_state],
560
- outputs=[out_video, status]
561
- )
562
-
563
- return demo
564
-
565
- # 8) Launch
566
- if __name__ == "__main__":
567
- logger.info("Launching CSP-safe Gradio interface for Hugging Face Spaces")
568
- demo = create_csp_safe_gradio()
569
- demo.queue().launch(
570
- server_name="0.0.0.0",
571
- server_port=7860,
572
- show_error=True,
573
- debug=False,
574
- inbrowser=False
575
- )
 
1
  #!/usr/bin/env python3
2
  """
3
+ utils package (lightweight __init__)
4
+ - Export only light helpers/consts at import time
5
+ - Provide LAZY wrappers for heavy CV functions so legacy imports still work:
6
+ from utils import segment_person_hq -> OK (resolved at call time)
 
7
  """
8
 
9
+ from __future__ import annotations
10
+
11
+ import os
12
+ import logging
13
+ from typing import Dict, Any, Tuple, Optional
14
+
15
+ import numpy as np # light; OK at import time
16
+
17
+ logger = logging.getLogger(__name__)
18
+
19
+ # ---------------------------------------------------------------------
20
+ # Background presets & builders (lightweight)
21
+ # ---------------------------------------------------------------------
22
+
23
+ PROFESSIONAL_BACKGROUNDS: Dict[str, Dict[str, Any]] = {
24
+ "office": {"color": (240, 248, 255), "gradient": True},
25
+ "studio": {"color": (32, 32, 32), "gradient": False},
26
+ "nature": {"color": (34, 139, 34), "gradient": True},
27
+ "abstract": {"color": (75, 0, 130), "gradient": True},
28
+ "white": {"color": (255, 255, 255), "gradient": False},
29
+ "black": {"color": (0, 0, 0), "gradient": False},
30
+ # add more if you like
31
+ }
32
+
33
+ def _solid_bg(color: Tuple[int,int,int], width: int, height: int) -> np.ndarray:
34
+ return np.full((height, width, 3), tuple(int(x) for x in color), dtype=np.uint8)
35
+
36
+ def _vertical_gradient(top: Tuple[int,int,int], bottom: Tuple[int,int,int], width: int, height: int) -> np.ndarray:
37
+ bg = np.zeros((height, width, 3), dtype=np.uint8)
38
+ for y in range(height):
39
+ t = y / max(1, height - 1)
40
+ r = int(top[0] * (1 - t) + bottom[0] * t)
41
+ g = int(top[1] * (1 - t) + bottom[1] * t)
42
+ b = int(top[2] * (1 - t) + bottom[2] * t)
43
+ bg[y, :] = (r, g, b)
44
+ return bg
45
+
46
+ def create_professional_background(key_or_cfg: Any, width: int, height: int) -> np.ndarray:
47
+ """
48
+ Accepts either:
49
+ - string key in PROFESSIONAL_BACKGROUNDS
50
+ - a config dict with {"color": (r,g,b), "gradient": bool}
51
+ Returns RGB uint8 background (H, W, 3).
52
+ """
53
+ if isinstance(key_or_cfg, str):
54
+ cfg = PROFESSIONAL_BACKGROUNDS.get(key_or_cfg, PROFESSIONAL_BACKGROUNDS["office"])
55
+ elif isinstance(key_or_cfg, dict):
56
+ cfg = key_or_cfg
57
+ else:
58
+ cfg = PROFESSIONAL_BACKGROUNDS["office"]
59
+
60
+ color = tuple(int(x) for x in cfg.get("color", (255, 255, 255)))
61
+ use_grad = bool(cfg.get("gradient", False))
62
+
63
+ if not use_grad:
64
+ return _solid_bg(color, width, height)
65
+
66
+ # simple vertical gradient dark->color
67
+ dark = (int(color[0]*0.7), int(color[1]*0.7), int(color[2]*0.7))
68
+ return _vertical_gradient(dark, color, width, height)
69
+
70
+ def create_gradient_background(spec: Dict[str, Any], width: int, height: int) -> np.ndarray:
71
+ """
72
+ spec: {"type": "linear"|"radial", "start": (r,g,b)|"#RRGGBB", "end": (r,g,b)|"#RRGGBB", "angle_deg": float}
73
+ Returns RGB uint8 background (H, W, 3). (Radial treated as linear fallback unless extended.)
74
+ """
75
+ import re
76
+ import cv2 # import locally to keep top-level light
77
+
78
+ def _to_rgb(c):
79
+ if isinstance(c, (list, tuple)) and len(c) == 3:
80
+ return tuple(int(x) for x in c)
81
+ if isinstance(c, str) and re.match(r"^#[0-9a-fA-F]{6}$", c):
82
+ return tuple(int(c[i:i+2], 16) for i in (1,3,5))
83
+ return (255, 255, 255)
84
+
85
+ start = _to_rgb(spec.get("start", (32, 32, 32)))
86
+ end = _to_rgb(spec.get("end", (200, 200, 200)))
87
+ angle = float(spec.get("angle_deg", 0.0))
88
+
89
+ bg = _vertical_gradient(start, end, width, height)
90
+
91
+ # rotate by angle
92
+ center = (width / 2, height / 2)
93
+ rot = cv2.getRotationMatrix2D(center, angle, 1.0)
94
+ bg = cv2.warpAffine(bg, rot, (width, height), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT_101)
95
+ return bg
96
+
97
+ # ---------------------------------------------------------------------
98
+ # Video validation (lightweight)
99
+ # ---------------------------------------------------------------------
100
+ def validate_video_file(video_path: str) -> bool:
101
+ """
102
+ Fast sanity check: file exists, cv2 can open, first frame is readable.
103
+ Returns True/False (lightweight for UI).
104
+ """
105
+ try:
106
+ if not video_path or not os.path.exists(video_path):
107
+ return False
108
+ import cv2 # local import
109
+ cap = cv2.VideoCapture(video_path)
110
+ if not cap.isOpened():
111
+ return False
112
+ ok, frame = cap.read()
113
+ cap.release()
114
+ return bool(ok and frame is not None)
115
+ except Exception as e:
116
+ logger.warning("validate_video_file error: %s", e)
117
+ return False
118
+
119
+ def validate_video_file_detail(video_path: str) -> Tuple[bool, str]:
120
+ if not video_path:
121
+ return False, "No path provided"
122
+ if not os.path.exists(video_path):
123
+ return False, "File does not exist"
124
+ try:
125
  import cv2
126
+ cap = cv2.VideoCapture(video_path)
127
+ if not cap.isOpened():
128
+ return False, "cv2 could not open file"
129
+ ok, frame = cap.read()
130
+ cap.release()
131
+ if not ok or frame is None:
132
+ return False, "Could not read first frame"
133
+ return True, "OK"
134
+ except Exception as e:
135
+ return False, f"cv2 error: {e}"
136
+
137
+ # ---------------------------------------------------------------------
138
+ # LAZY WRAPPERS (avoid importing utils.cv_processing at module import time)
139
+ # ---------------------------------------------------------------------
140
+ def segment_person_hq(*args, **kwargs):
141
+ from .cv_processing import segment_person_hq as _f
142
+ return _f(*args, **kwargs)
143
+
144
+ def refine_mask_hq(*args, **kwargs):
145
+ from .cv_processing import refine_mask_hq as _f
146
+ return _f(*args, **kwargs)
147
+
148
+ def replace_background_hq(*args, **kwargs):
149
+ from .cv_processing import replace_background_hq as _f
150
+ return _f(*args, **kwargs)
151
+
152
+ __all__ = [
153
+ # backgrounds
154
+ "PROFESSIONAL_BACKGROUNDS",
155
+ "create_professional_background",
156
+ "create_gradient_background",
157
+ # validation
158
+ "validate_video_file",
159
+ "validate_video_file_detail",
160
+ # lazy CV exports (back-compat)
161
+ "segment_person_hq",
162
+ "refine_mask_hq",
163
+ "replace_background_hq",
164
+ ]