MogensR commited on
Commit
32853be
·
1 Parent(s): 8fd190a

Update utils/__init__.py

Browse files
Files changed (1) hide show
  1. utils/__init__.py +163 -27
utils/__init__.py CHANGED
@@ -2,13 +2,20 @@
2
  """
3
  BackgroundFX Pro - CSP-Safe Application Entry Point
4
  Now with: live background preview + sources: Preset / Upload / Gradient / AI Generate
 
 
 
5
  """
6
 
7
  import early_env # <<< must be FIRST
8
 
9
- import os, time
10
  from typing import Optional, Dict, Any, Callable, Tuple
11
 
 
 
 
 
12
  # 1) CSP-safe Gradio env
13
  os.environ['GRADIO_ALLOW_FLAGGING'] = 'never'
14
  os.environ['GRADIO_ANALYTICS_ENABLED'] = 'False'
@@ -45,14 +52,14 @@ def _patched_get_type(schema):
45
  from processing.video.video_processor import CoreVideoProcessor, ProcessorConfig
46
  from processing.audio.audio_processor import AudioProcessor
47
 
48
- # Background helpers
49
  from utils import PROFESSIONAL_BACKGROUNDS, validate_video_file, create_professional_background
50
  # Gradient helper (add to utils; fallback here for preview only if missing)
51
  try:
52
  from utils import create_gradient_background
53
  except Exception:
54
  def create_gradient_background(spec: Dict[str, Any], width: int, height: int):
55
- # Lightweight fallback preview (linear only)
56
  import numpy as np
57
  import cv2
58
  def _to_rgb(c):
@@ -122,6 +129,12 @@ def _np_to_pil(arr: np.ndarray) -> Image.Image:
122
  arr = arr.clip(0, 255).astype(np.uint8)
123
  return Image.fromarray(arr)
124
 
 
 
 
 
 
 
125
  # ---------- main app ----------
126
  class VideoBackgroundApp:
127
  def __init__(self):
@@ -132,6 +145,9 @@ def __init__(self):
132
  self.audio_proc = AudioProcessor()
133
  self.models_loaded = False
134
  self.core_processor: Optional[CoreVideoProcessor] = None
 
 
 
135
  logger.info("VideoBackgroundApp initialized (device=%s)", self.device_mgr.get_optimal_device())
136
 
137
  def load_models(self, progress_callback: Optional[Callable] = None) -> str:
@@ -194,29 +210,144 @@ def preview_gradient(self, gtype: str, color1: str, color2: str, angle: int) ->
194
  bg = create_gradient_background(spec, PREVIEW_W, PREVIEW_H)
195
  return _np_to_pil(bg)
196
 
197
- def ai_generate_background(self, prompt: str, seed: int, width: int, height: int) -> Tuple[Optional[Image.Image], Optional[str], str]:
 
198
  """
199
- Try generating a background with diffusers; save to /tmp and return (img, path, status).
 
200
  """
 
 
 
201
  try:
202
- from diffusers import StableDiffusionPipeline
203
  import torch
204
- model_id = os.environ.get("BGFX_T2I_MODEL", "stabilityai/stable-diffusion-2-1")
205
- dtype = torch.float16 if torch.cuda.is_available() else torch.float32
206
- device = "cuda" if torch.cuda.is_available() else "cpu"
207
- pipe = StableDiffusionPipeline.from_pretrained(model_id, torch_dtype=dtype).to(device)
208
- g = torch.Generator(device=device).manual_seed(int(seed)) if seed is not None else None
209
- if device == "cuda":
210
- with torch.autocast("cuda"):
211
- img = pipe(prompt, height=height, width=width, guidance_scale=7.0, num_inference_steps=25, generator=g).images[0]
 
 
 
 
 
 
 
 
 
212
  else:
213
- img = pipe(prompt, height=height, width=width, guidance_scale=7.0, num_inference_steps=25, generator=g).images[0]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
214
  tmp_path = f"/tmp/ai_bg_{int(time.time())}.png"
215
  img.save(tmp_path)
216
- return img.resize((PREVIEW_W, PREVIEW_H), Image.LANCZOS), tmp_path, f"AI background generated ✓ ({os.path.basename(tmp_path)})"
 
217
  except Exception as e:
218
- logger.warning("AI generation unavailable: %s", e)
219
- return None, None, f"AI generation unavailable: {e}"
220
 
221
  # ---- PROCESS VIDEO ----
222
  def process_video(
@@ -233,14 +364,18 @@ def process_video(
233
  ):
234
  if not self.models_loaded:
235
  return None, "Models not loaded yet"
236
-
237
  if not video:
238
  return None, "Please upload a video first."
239
 
240
- logger.info("process_video called (video=%s, source=%s, preset=%s, file=%s, grad=%s, ai=%s)",
241
- video, bg_source, preset_key, getattr(custom_bg_file, "name", None) if custom_bg_file else None,
242
- {"type": grad_type, "c1": grad_color1, "c2": grad_color2, "angle": grad_angle},
243
- ai_bg_path)
 
 
 
 
 
244
 
245
  output_path = f"/tmp/output_{int(time.time())}.mp4"
246
 
@@ -273,7 +408,7 @@ def process_video(
273
  result = self.core_processor.process_video(
274
  input_path=video,
275
  output_path=output_path,
276
- bg_config=bg_cfg
277
  )
278
  logger.info("Core processing done → %s", output_path)
279
 
@@ -367,7 +502,7 @@ def on_source_toggle(src):
367
  )
368
 
369
  # ✅ Clear any previous AI image path when switching source (avoids stale AI background)
370
- def _clear_ai_state(_):
371
  return None
372
  bg_source.change(fn=_clear_ai_state, inputs=[bg_source], outputs=[ai_bg_path_state])
373
 
@@ -399,12 +534,13 @@ def on_source_preview(src, pkey, gt, c1, c2, ang):
399
  # AI generate
400
  def ai_generate(prompt, seed, size):
401
  try:
402
- w, h = map(int, size.split("x"))
403
  except Exception:
404
  w, h = PREVIEW_W, PREVIEW_H
405
  img, path, msg = app.ai_generate_background(
406
  prompt or "professional modern office background, neutral colors, depth of field",
407
- int(seed), w, h
 
408
  )
409
  return img, (path or None), msg
410
  ai_go.click(fn=ai_generate, inputs=[ai_prompt, ai_seed, ai_size], outputs=[bg_preview, ai_bg_path_state, ai_status])
 
2
  """
3
  BackgroundFX Pro - CSP-Safe Application Entry Point
4
  Now with: live background preview + sources: Preset / Upload / Gradient / AI Generate
5
+ - Lazy-loaded Diffusers pipeline (VRAM-aware: sd-turbo / sdxl-turbo / sd-2.1 CPU)
6
+ - Preview shows the exact background used
7
+ - Clears stale AI image when switching sources
8
  """
9
 
10
  import early_env # <<< must be FIRST
11
 
12
+ import os, time, math
13
  from typing import Optional, Dict, Any, Callable, Tuple
14
 
15
+ # Prefer a writable cache in constrained environments (e.g., HF Spaces)
16
+ os.environ.setdefault("HF_HOME", "/tmp/hf")
17
+ os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1")
18
+
19
  # 1) CSP-safe Gradio env
20
  os.environ['GRADIO_ALLOW_FLAGGING'] = 'never'
21
  os.environ['GRADIO_ANALYTICS_ENABLED'] = 'False'
 
52
  from processing.video.video_processor import CoreVideoProcessor, ProcessorConfig
53
  from processing.audio.audio_processor import AudioProcessor
54
 
55
+ # Background helpers (kept lightweight to avoid cycles)
56
  from utils import PROFESSIONAL_BACKGROUNDS, validate_video_file, create_professional_background
57
  # Gradient helper (add to utils; fallback here for preview only if missing)
58
  try:
59
  from utils import create_gradient_background
60
  except Exception:
61
  def create_gradient_background(spec: Dict[str, Any], width: int, height: int):
62
+ # Lightweight fallback (linear+rotate only)
63
  import numpy as np
64
  import cv2
65
  def _to_rgb(c):
 
129
  arr = arr.clip(0, 255).astype(np.uint8)
130
  return Image.fromarray(arr)
131
 
132
+ def _div8(n: int) -> int:
133
+ # Ensure sizes are multiples of 8 for SD/VAEs (min 256)
134
+ n = int(n)
135
+ if n < 256: n = 256
136
+ return int(math.floor(n / 8.0) * 8)
137
+
138
  # ---------- main app ----------
139
  class VideoBackgroundApp:
140
  def __init__(self):
 
145
  self.audio_proc = AudioProcessor()
146
  self.models_loaded = False
147
  self.core_processor: Optional[CoreVideoProcessor] = None
148
+ # Text-to-image cache
149
+ self.t2i_pipe = None
150
+ self.t2i_model_id = None
151
  logger.info("VideoBackgroundApp initialized (device=%s)", self.device_mgr.get_optimal_device())
152
 
153
  def load_models(self, progress_callback: Optional[Callable] = None) -> str:
 
210
  bg = create_gradient_background(spec, PREVIEW_W, PREVIEW_H)
211
  return _np_to_pil(bg)
212
 
213
+ # ---- AI BG: lazy-load + reuse pipe ----
214
+ def _ensure_t2i(self):
215
  """
216
+ Load a text-to-image pipeline once with memory-efficient settings.
217
+ Returns (pipe, model_id, msg).
218
  """
219
+ if self.t2i_pipe is not None:
220
+ return self.t2i_pipe, self.t2i_model_id, "AI generator ready"
221
+
222
  try:
 
223
  import torch
224
+ from diffusers import AutoPipelineForText2Image, StableDiffusionPipeline
225
+ except Exception as e:
226
+ return None, None, f"AI generation unavailable (missing deps): {e}"
227
+
228
+ token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN")
229
+ device = "cuda" if getattr(__import__("torch"), "cuda", None) and __import__("torch").cuda.is_available() else "cpu"
230
+
231
+ # Try to estimate VRAM to pick a model
232
+ vram_gb = None
233
+ try:
234
+ vram_gb = self.device_mgr.get_device_memory_gb()
235
+ except Exception:
236
+ pass
237
+
238
+ if device == "cuda":
239
+ if vram_gb and vram_gb >= 12:
240
+ model_id = os.environ.get("BGFX_T2I_MODEL", "stabilityai/sdxl-turbo")
241
  else:
242
+ model_id = os.environ.get("BGFX_T2I_MODEL", "stabilityai/sd-turbo")
243
+ else:
244
+ model_id = os.environ.get("BGFX_T2I_MODEL", "stabilityai/stable-diffusion-2-1")
245
+
246
+ logger.info("Loading text-to-image model: %s (device=%s, VRAM=%s GB)", model_id, device, vram_gb)
247
+ dtype = __import__("torch").float16 if device == "cuda" else __import__("torch").float32
248
+
249
+ pipe = None
250
+ try:
251
+ # Unified API for turbo/SDXL/SD
252
+ pipe = AutoPipelineForText2Image.from_pretrained(
253
+ model_id,
254
+ torch_dtype=dtype,
255
+ use_safetensors=True,
256
+ token=token,
257
+ )
258
+ except Exception as e1:
259
+ try:
260
+ pipe = StableDiffusionPipeline.from_pretrained(
261
+ model_id,
262
+ torch_dtype=dtype,
263
+ use_safetensors=True,
264
+ safety_checker=None,
265
+ feature_extractor=None,
266
+ use_auth_token=token,
267
+ )
268
+ except Exception as e2:
269
+ return None, None, f"AI model load failed: {e1} / {e2}"
270
+
271
+ # Memory/perf knobs
272
+ try: pipe.set_progress_bar_config(disable=True)
273
+ except Exception: pass
274
+ try: pipe.enable_attention_slicing()
275
+ except Exception: pass
276
+ try: pipe.enable_vae_slicing()
277
+ except Exception: pass
278
+
279
+ if device == "cuda":
280
+ try: pipe.enable_xformers_memory_efficient_attention()
281
+ except Exception: pass
282
+ pipe = pipe.to(device)
283
+ else:
284
+ try: pipe.enable_sequential_cpu_offload()
285
+ except Exception: pass
286
+
287
+ self.t2i_pipe = pipe
288
+ self.t2i_model_id = model_id
289
+ return pipe, model_id, f"AI model loaded: {model_id}"
290
+
291
+ def ai_generate_background(self, prompt: str, seed: int, width: int, height: int) -> Tuple[Optional[Image.Image], Optional[str], str]:
292
+ """
293
+ Generate a background and save to /tmp. Returns (preview_img, path, status).
294
+ """
295
+ pipe, model_id, status_msg = self._ensure_t2i()
296
+ if pipe is None:
297
+ logger.warning(status_msg)
298
+ return None, None, status_msg
299
+
300
+ # Sizes: multiples of 8, clamped to safe range
301
+ w = _div8(width or PREVIEW_W)
302
+ h = _div8(height or PREVIEW_H)
303
+ w = max(256, min(w, 1536))
304
+ h = max(256, min(h, 1536))
305
+
306
+ # Prompt defaults aimed at "office-like" backgrounds
307
+ prompt = (prompt or "professional modern office background, neutral colors, soft depth of field, clean, minimal, photorealistic")
308
+ negative = "text, watermark, logo, people, person, artifact, noisy, blurry"
309
+
310
+ try:
311
+ import torch
312
+ device = "cuda" if getattr(torch, "cuda", None) and torch.cuda.is_available() else "cpu"
313
+ try:
314
+ g = torch.Generator(device=device).manual_seed(int(seed)) if seed is not None else None
315
+ except Exception:
316
+ g = None
317
+
318
+ steps = 4 if ("turbo" in (self.t2i_model_id or "").lower()) else 25
319
+ guidance = 1.0 if ("turbo" in (self.t2i_model_id or "").lower()) else 7.0
320
+
321
+ with torch.inference_mode():
322
+ if device == "cuda":
323
+ with torch.autocast("cuda"):
324
+ out = pipe(
325
+ prompt=prompt,
326
+ negative_prompt=negative,
327
+ height=h,
328
+ width=w,
329
+ guidance_scale=guidance,
330
+ num_inference_steps=steps,
331
+ generator=g,
332
+ )
333
+ else:
334
+ out = pipe(
335
+ prompt=prompt,
336
+ negative_prompt=negative,
337
+ height=h,
338
+ width=w,
339
+ guidance_scale=guidance,
340
+ num_inference_steps=steps,
341
+ generator=g,
342
+ )
343
+ img = out.images[0]
344
  tmp_path = f"/tmp/ai_bg_{int(time.time())}.png"
345
  img.save(tmp_path)
346
+
347
+ return img.resize((PREVIEW_W, PREVIEW_H), Image.LANCZOS), tmp_path, f"{status_msg} • Generated {w}x{h}"
348
  except Exception as e:
349
+ logger.exception("AI generation error")
350
+ return None, None, f"AI generation failed: {e}"
351
 
352
  # ---- PROCESS VIDEO ----
353
  def process_video(
 
364
  ):
365
  if not self.models_loaded:
366
  return None, "Models not loaded yet"
 
367
  if not video:
368
  return None, "Please upload a video first."
369
 
370
+ logger.info(
371
+ "process_video called (video=%s, source=%s, preset=%s, file=%s, grad=%s, ai=%s)",
372
+ video,
373
+ bg_source,
374
+ preset_key,
375
+ getattr(custom_bg_file, "name", None) if custom_bg_file else None,
376
+ {"type": grad_type, "c1": grad_color1, "c2": grad_color2, "angle": grad_angle},
377
+ ai_bg_path,
378
+ )
379
 
380
  output_path = f"/tmp/output_{int(time.time())}.mp4"
381
 
 
408
  result = self.core_processor.process_video(
409
  input_path=video,
410
  output_path=output_path,
411
+ bg_config=bg_cfg,
412
  )
413
  logger.info("Core processing done → %s", output_path)
414
 
 
502
  )
503
 
504
  # ✅ Clear any previous AI image path when switching source (avoids stale AI background)
505
+ def _clear_ai_state(_):
506
  return None
507
  bg_source.change(fn=_clear_ai_state, inputs=[bg_source], outputs=[ai_bg_path_state])
508
 
 
534
  # AI generate
535
  def ai_generate(prompt, seed, size):
536
  try:
537
+ w, h = map(int, (size or "640x360").split("x"))
538
  except Exception:
539
  w, h = PREVIEW_W, PREVIEW_H
540
  img, path, msg = app.ai_generate_background(
541
  prompt or "professional modern office background, neutral colors, depth of field",
542
+ int(seed) if seed is not None else 42,
543
+ w, h
544
  )
545
  return img, (path or None), msg
546
  ai_go.click(fn=ai_generate, inputs=[ai_prompt, ai_seed, ai_size], outputs=[bg_preview, ai_bg_path_state, ai_status])