|
|
|
|
|
|
|
|
|
|
|
|
|
|
| import os
|
| from dataclasses import dataclass
|
| from typing import List, Tuple, Optional
|
|
|
| import warnings
|
| warnings.filterwarnings("ignore")
|
|
|
|
|
| os.environ["OPENCV_LOG_LEVEL"] = "ERROR"
|
|
|
| import numpy as np
|
| import torch
|
| import comfy
|
| from PIL import Image
|
| import cv2
|
|
|
| try:
|
| if hasattr(cv2, "setLogLevel"):
|
| try:
|
| lvl = cv2.LOG_LEVEL_ERROR if hasattr(cv2, "LOG_LEVEL_ERROR") else 3
|
| cv2.setLogLevel(lvl)
|
| except Exception:
|
| pass
|
| except Exception:
|
| pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| torch.backends.cudnn.benchmark = True
|
| if torch.cuda.is_available():
|
| torch.backends.cuda.matmul.allow_tf32 = True
|
| torch.backends.cudnn.allow_tf32 = True
|
| try:
|
| torch.set_float32_matmul_precision("high")
|
| except Exception:
|
| pass
|
|
|
|
|
| try:
|
|
|
| from impact.core import SEG as _IMPACT_SEG
|
| _USE_IMPACT_SEG = True
|
| except Exception:
|
| _USE_IMPACT_SEG = False
|
|
|
| @dataclass
|
| class _LocalSEG:
|
| cropped_image: Optional[torch.Tensor]
|
| cropped_mask: np.ndarray
|
| confidence: float
|
| crop_region: Tuple[int, int, int, int]
|
| bbox: Tuple[int, int, int, int]
|
| label: str
|
| control_net_wrapper: Optional[object] = None
|
|
|
| SEG = _IMPACT_SEG if _USE_IMPACT_SEG else _LocalSEG
|
|
|
|
|
|
|
|
|
|
|
|
|
| BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
|
| YOLO_MODEL_PATH = os.path.join(BASE_DIR, "assets", "face_yolov8m_salia.pt")
|
| YOLO_IMGSZ = 640
|
|
|
|
|
| SAM_CKPT_PATH = os.path.join(BASE_DIR, "assets", "sam_vit_b_01ec64_salia.pth")
|
|
|
|
|
| _CACHED_YOLO_MODEL = None
|
| _CACHED_ULTRA_DETECTOR = None
|
|
|
|
|
| def _tensor_to_pil(image: torch.Tensor) -> Image.Image:
|
|
|
| img = image[0].detach().cpu().clamp(0, 1).numpy()
|
| img = (img * 255.0).round().astype(np.uint8)
|
| return Image.fromarray(img, mode="RGB")
|
|
|
|
|
| def _make_crop_region(w: int, h: int, bbox_xyxy, crop_factor: float) -> Tuple[int, int, int, int]:
|
| x1, y1, x2, y2 = map(int, bbox_xyxy)
|
| cx = (x1 + x2) * 0.5
|
| cy = (y1 + y2) * 0.5
|
| bw = (x2 - x1)
|
| bh = (y2 - y1)
|
| new_w = max(1, int(bw * crop_factor))
|
| new_h = max(1, int(bh * crop_factor))
|
|
|
| nx1 = int(max(0, round(cx - new_w * 0.5)))
|
| ny1 = int(max(0, round(cy - new_h * 0.5)))
|
| nx2 = int(min(w, nx1 + new_w))
|
| ny2 = int(min(h, ny1 + new_h))
|
|
|
| nx1 = max(0, min(nx1, w - 1))
|
| ny1 = max(0, min(ny1, h - 1))
|
| nx2 = max(nx1 + 1, min(nx2, w))
|
| ny2 = max(ny1 + 1, min(ny2, h))
|
| return (nx1, ny1, nx2, ny2)
|
|
|
|
|
| def _crop_tensor_image(image: torch.Tensor, crop: Tuple[int, int, int, int]) -> torch.Tensor:
|
|
|
| x1, y1, xb, yb = crop
|
| return image[:, y1:yb, x1:xb, :].contiguous()
|
|
|
|
|
| def _crop_ndarray(mask: np.ndarray, crop: Tuple[int, int, int, int]) -> np.ndarray:
|
|
|
| x1, y1, xb, yb = crop
|
| return mask[int(y1):int(yb), int(x1):int(xb)]
|
|
|
|
|
| def _dilate_masks(segmasks: List[Tuple[np.ndarray, np.ndarray, float]], factor: int):
|
| if factor == 0 or not segmasks:
|
| return segmasks
|
| k = abs(int(factor))
|
| if k < 1:
|
| return segmasks
|
| kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k, k))
|
| do_dilate = factor > 0
|
| out = []
|
| for (bbox, m, conf) in segmasks:
|
| u8 = (m * 255.0).astype(np.uint8) if m.dtype != np.uint8 else m
|
| d = cv2.dilate(u8, kernel, iterations=1) if do_dilate else cv2.erode(u8, kernel, iterations=1)
|
| out.append((bbox, d.astype(np.float32) / 255.0, conf))
|
| return out
|
|
|
|
|
| def _combine_masks(segmasks: List[Tuple[np.ndarray, np.ndarray, float]]) -> Optional[torch.Tensor]:
|
| if not segmasks:
|
| return None
|
| h = segmasks[0][1].shape[0]
|
| w = segmasks[0][1].shape[1]
|
| acc = np.zeros((h, w), dtype=np.uint8)
|
| for _, m, _ in segmasks:
|
| u8 = (m * 255.0).astype(np.uint8) if m.dtype != np.uint8 else m
|
| acc = cv2.bitwise_or(acc, u8)
|
| return torch.from_numpy(acc.astype(np.float32) / 255.0)
|
|
|
|
|
| def _pick_device_str(user_device: str = "") -> str:
|
| if user_device:
|
| return user_device
|
| return "cuda" if torch.cuda.is_available() else "cpu"
|
|
|
|
|
| @torch.inference_mode()
|
| def _inference_bbox(model, image_pil: Image.Image, confidence: float = 0.3, device: str = ""):
|
| """
|
| Returns results = [labels(str), bboxes(xyxy), segms(full-image bool masks), conf(float)]
|
| For bbox models, segm "masks" are rectangles from the boxes (Subpack parity).
|
| """
|
| pred = model(
|
| image_pil,
|
| conf=confidence,
|
| device=_pick_device_str(device),
|
| verbose=False,
|
| imgsz=YOLO_IMGSZ,
|
| )
|
|
|
| p0 = pred[0]
|
| boxes = p0.boxes
|
| bboxes = boxes.xyxy.detach().cpu().numpy()
|
|
|
| W, H = image_pil.size
|
| segms = []
|
| for x0, y0, x1, y1 in bboxes:
|
| m = np.zeros((H, W), np.uint8)
|
| cv2.rectangle(m, (int(x0), int(y0)), (int(x1), int(y1)), 255, -1)
|
| segms.append(m.astype(bool))
|
|
|
| if bboxes.shape[0] == 0:
|
| return [[], [], [], []]
|
|
|
| results = [[], [], [], []]
|
| names = p0.names
|
| for i, (bbox, segm) in enumerate(zip(bboxes, segms)):
|
| cls_i = int(boxes.cls[i].item())
|
| results[0].append(names[cls_i])
|
| results[1].append(bbox)
|
| results[2].append(segm)
|
| results[3].append(float(boxes.conf[i].item()))
|
| return results
|
|
|
|
|
| def _create_segmasks(results):
|
| bboxes = results[1]
|
| segms = results[2]
|
| confs = results[3]
|
| out = []
|
| for i in range(len(segms)):
|
| out.append((bboxes[i], segms[i].astype(np.float32), confs[i]))
|
| return out
|
|
|
|
|
| class UltraBBoxDetector:
|
| def __init__(self, yolo_model):
|
| self.bbox_model = yolo_model
|
|
|
| def detect(self, image, threshold, dilation, crop_factor, drop_size=1, detailer_hook=None):
|
| drop_size = max(int(drop_size), 1)
|
| detected = _inference_bbox(self.bbox_model, _tensor_to_pil(image), threshold)
|
| segmasks = _create_segmasks(detected)
|
| if int(dilation) != 0:
|
| segmasks = _dilate_masks(segmasks, int(dilation))
|
|
|
| H = int(image.shape[1])
|
| W = int(image.shape[2])
|
| items = []
|
| for (bbox_xyxy, full_mask, conf), label in zip(segmasks, detected[0]):
|
| x1, y1, x2, y2 = map(int, bbox_xyxy)
|
| if (x2 - x1) > drop_size and (y2 - y1) > drop_size:
|
| crop_region = _make_crop_region(W, H, (x1, y1, x2, y2), float(crop_factor))
|
| if detailer_hook is not None and hasattr(detailer_hook, "post_crop_region"):
|
| crop_region = detailer_hook.post_crop_region(W, H, (x1, y1, x2, y2), crop_region)
|
|
|
| cropped_image = _crop_tensor_image(image, crop_region)
|
| cropped_mask = _crop_ndarray(full_mask, crop_region).astype(np.float32)
|
| items.append(SEG(cropped_image, cropped_mask, float(conf), crop_region, (x1, y1, x2, y2), str(label), None))
|
|
|
| segs = ((H, W), items)
|
| if detailer_hook is not None and hasattr(detailer_hook, "post_detection"):
|
| segs = detailer_hook.post_detection(segs)
|
| return segs
|
|
|
| def detect_combined(self, image, threshold, dilation):
|
| detected = _inference_bbox(self.bbox_model, _tensor_to_pil(image), threshold)
|
| segmasks = _create_segmasks(detected)
|
| if int(dilation) != 0:
|
| segmasks = _dilate_masks(segmasks, int(dilation))
|
| return _combine_masks(segmasks)
|
|
|
| def setAux(self, x):
|
|
|
| pass
|
|
|
|
|
| def _load_ultralytics_model(model_path: str):
|
|
|
| try:
|
| from ultralytics import YOLO
|
| except Exception as e:
|
| raise RuntimeError(
|
| "[FaceDetailerStandalone] The 'ultralytics' package is required for the embedded bbox detector.\n"
|
| "Install in your ComfyUI python: python -m pip install --upgrade ultralytics"
|
| ) from e
|
|
|
| if not os.path.isfile(model_path):
|
| raise FileNotFoundError(
|
| "[FaceDetailerStandalone] Embedded YOLO model file not found.\n"
|
| f"Expected at: {model_path}\n"
|
| "Please place 'face_yolov8m_salia.pt' in the 'assets' folder next to this node."
|
| )
|
|
|
| yolo = YOLO(model_path)
|
|
|
|
|
| try:
|
| dev = _pick_device_str()
|
| try:
|
| yolo.to(dev)
|
| except Exception:
|
| yolo.model.to(dev)
|
| except Exception:
|
| pass
|
|
|
|
|
| try:
|
| yolo.fuse()
|
| except Exception:
|
| pass
|
|
|
|
|
| try:
|
| if torch.cuda.is_available():
|
| yolo.model.half()
|
| except Exception:
|
| pass
|
|
|
| return yolo
|
|
|
|
|
| def _get_embedded_detector():
|
| global _CACHED_YOLO_MODEL, _CACHED_ULTRA_DETECTOR
|
| if _CACHED_ULTRA_DETECTOR is not None:
|
| return _CACHED_ULTRA_DETECTOR
|
| if _CACHED_YOLO_MODEL is None:
|
| _CACHED_YOLO_MODEL = _load_ultralytics_model(YOLO_MODEL_PATH)
|
| _CACHED_ULTRA_DETECTOR = UltraBBoxDetector(_CACHED_YOLO_MODEL)
|
| return _CACHED_ULTRA_DETECTOR
|
|
|
|
|
|
|
|
|
|
|
| def _to_numpy_rgb(image_tensor):
|
| """
|
| Comfy 'IMAGE' is NHWC in [0..1]. Convert to uint8 HxWx3 RGB numpy.
|
| Accepts torch.Tensor (NHWC) or numpy already in HWC.
|
| """
|
| if isinstance(image_tensor, torch.Tensor):
|
| img = image_tensor
|
| if img.dim() == 4 and img.shape[0] == 1:
|
| img = img[0]
|
| img = (img.clamp(0, 1) * 255.0).to(torch.uint8).cpu().numpy()
|
| return img
|
| elif isinstance(image_tensor, np.ndarray):
|
| if image_tensor.dtype != np.uint8:
|
| img = np.clip(image_tensor, 0, 255).astype(np.uint8)
|
| else:
|
| img = image_tensor
|
| return img
|
| else:
|
| raise TypeError(f"Unsupported image type for SAM: {type(image_tensor)}")
|
|
|
|
|
| class _SAMWrapperGPUOnlyFast:
|
| """
|
| FaceDetailer-compatible wrapper:
|
| - Stays on CUDA
|
| - Reuses a single SamPredictor
|
| - predict(image, points, plabs, bbox, threshold) -> list[HxW float32 CPU masks]
|
| """
|
| def __init__(self, model):
|
| self.model = model
|
| dev = comfy.model_management.get_torch_device()
|
| if "cuda" not in str(dev).lower():
|
| raise RuntimeError(
|
| f"[FaceDetailerStandalone] GPU-only SAM: CUDA device not available (got '{dev}')."
|
| )
|
| self._device = dev
|
| self.model.to(self._device).eval()
|
|
|
| from segment_anything import SamPredictor
|
|
|
| self._predictor = SamPredictor(self.model)
|
|
|
| def prepare_device(self):
|
| if "cuda" not in str(self._device).lower():
|
| raise RuntimeError("[FaceDetailerStandalone] CUDA device lost/unavailable for SAM.")
|
|
|
| def release_device(self):
|
|
|
| pass
|
|
|
| @torch.inference_mode()
|
| def predict(self, image, points, plabs, bbox, threshold: float):
|
| """
|
| image: Comfy IMAGE (NHWC, [0..1]) or numpy
|
| points: list[[x,y], ...] or None
|
| plabs: list[int] (1=fg, 0=bg) or None
|
| bbox: [x1,y1,x2,y2] or None
|
| threshold: float in [0..1]
|
| returns: list of HxW float32 CPU masks (0/1)
|
| """
|
| self.prepare_device()
|
|
|
| np_img = _to_numpy_rgb(image)
|
|
|
| try:
|
| self._predictor.set_image(np_img, "RGB")
|
| except TypeError:
|
| self._predictor.set_image(np_img)
|
|
|
| pc = np.array(points, dtype=np.float32) if points else None
|
| pl = np.array(plabs, dtype=np.int32) if plabs else None
|
| bx = np.array(bbox, dtype=np.float32) if bbox is not None else None
|
|
|
|
|
| masks, scores, _ = self._predictor.predict(
|
| point_coords=pc,
|
| point_labels=pl,
|
| box=bx,
|
| multimask_output=False
|
| )
|
|
|
| out = []
|
| if masks is not None and scores is not None:
|
| for m, s in zip(masks, scores):
|
| if float(s) >= float(threshold):
|
| if isinstance(m, torch.Tensor):
|
| t = m.to(torch.float32).cpu()
|
| else:
|
| t = torch.from_numpy(m.astype(np.float32)).cpu()
|
| out.append(t)
|
| return out
|
|
|
|
|
|
|
| _CACHED_SAM_MODEL = None
|
|
|
|
|
| def _get_embedded_sam():
|
| """Load SAM vit_b from SAM_CKPT_PATH and attach GPU-only fast wrapper, cached."""
|
| global _CACHED_SAM_MODEL
|
| if _CACHED_SAM_MODEL is not None:
|
| return _CACHED_SAM_MODEL
|
|
|
| if not os.path.isfile(SAM_CKPT_PATH):
|
| raise FileNotFoundError(
|
| f"[FaceDetailerStandalone] SAM checkpoint not found:\n {SAM_CKPT_PATH}\n"
|
| f"Place 'sam_vit_b_01ec64_salia.pth' in the 'assets' folder next to this node."
|
| )
|
|
|
|
|
| try:
|
| from segment_anything import sam_model_registry
|
| except Exception as e:
|
| raise RuntimeError(
|
| "[FaceDetailerStandalone] 'segment_anything' is not installed for embedded SAM. "
|
| "Install in your Comfy python, e.g.: python -m pip install "
|
| "git+https://github.com/facebookresearch/segment-anything"
|
| ) from e
|
|
|
|
|
| sam = sam_model_registry['vit_b'](checkpoint=SAM_CKPT_PATH)
|
| sam.eval()
|
|
|
|
|
| wrapper = _SAMWrapperGPUOnlyFast(sam)
|
| sam.sam_wrapper = wrapper
|
|
|
| _CACHED_SAM_MODEL = sam
|
| return _CACHED_SAM_MODEL
|
|
|
|
|
| _ENHANCE_FACE = None
|
| _IMPORT_ERR = None
|
| try:
|
| from impact.impact_pack import FaceDetailer as _FD
|
| _ENHANCE_FACE = _FD.enhance_face
|
| except Exception as _e:
|
| _IMPORT_ERR = _e
|
| _ENHANCE_FACE = None
|
|
|
|
|
| class dn_04:
|
| @classmethod
|
| def INPUT_TYPES(cls):
|
|
|
| return {
|
| "required": {
|
| "image": ("IMAGE",),
|
| "model": ("MODEL", {"tooltip": "If `ImpactDummyInput` is connected to model, inference is skipped."}),
|
| "clip": ("CLIP",),
|
| "vae": ("VAE",),
|
|
|
|
|
| "sampler_name": (comfy.samplers.KSampler.SAMPLERS,),
|
|
|
|
|
| "positive": ("CONDITIONING",),
|
| "negative": ("CONDITIONING",),
|
|
|
|
|
| "seed": ("INT", {
|
| "default": 0,
|
| "min": 0,
|
| "max": 0xffffffffffffffff,
|
| "step": 1,
|
| "control_after_generate": "fixed",
|
| }),
|
| },
|
| "optional": {
|
|
|
| }
|
| }
|
|
|
| RETURN_TYPES = ("IMAGE",)
|
| RETURN_NAMES = ("image",)
|
| FUNCTION = "doit"
|
| CATEGORY = "ImpactPack/Standalone"
|
| DESCRIPTION = (
|
| "Face Detailer with requested parameters hardcoded (non-editable), "
|
| "and embedded Ultralytics face bbox detector + embedded SAM (no external input nodes). "
|
| "Optimized call path (cached imports + inference_mode) for lower overhead; "
|
| "results identical to Impact Pack Face Detailer at the same settings."
|
| )
|
|
|
| def doit(
|
| self,
|
| image, model, clip, vae,
|
| sampler_name,
|
| positive, negative,
|
| seed,
|
| ):
|
| if _ENHANCE_FACE is None:
|
| raise RuntimeError(
|
| "ComfyUI-Impact-Pack is required for Face Detailer logic. "
|
| "Please install/enable ComfyUI-Impact-Pack."
|
| ) from _IMPORT_ERR
|
|
|
|
|
| bbox_detector = _get_embedded_detector()
|
| sam_model_opt = _get_embedded_sam()
|
|
|
| enhance = _ENHANCE_FACE
|
|
|
|
|
| B = image.shape[0] if (hasattr(image, "shape") and image.ndim == 4) else 1
|
|
|
|
|
| with torch.inference_mode():
|
| if B == 1:
|
|
|
| single = image[0] if image.ndim == 4 else image
|
| enhanced_img, _, _, _, _ = enhance(
|
| single.unsqueeze(0),
|
| model, clip, vae,
|
| 512, True, 1024,
|
| seed, 38, 7.0,
|
| sampler_name, "simple",
|
| positive, negative,
|
| 0.4, 5, True, True,
|
| 0.5, 10, 3.0,
|
| "center-1", 0, 0.93, 0,
|
| 0.7, "False",
|
| 10, bbox_detector,
|
|
|
| segm_detector=None, sam_model_opt=sam_model_opt,
|
| wildcard_opt="", detailer_hook=None,
|
| refiner_ratio=None, refiner_model=None, refiner_clip=None,
|
| refiner_positive=None, refiner_negative=None,
|
| cycle=1, inpaint_model=False,
|
| noise_mask_feather=20,
|
| scheduler_func_opt=None,
|
| tiled_encode=False, tiled_decode=False,
|
| )
|
| return (enhanced_img,)
|
|
|
|
|
| out_imgs = []
|
| for i, single in enumerate(image.unbind(0)):
|
| enhanced_img, _, _, _, _ = enhance(
|
| single.unsqueeze(0),
|
| model, clip, vae,
|
| 512, True, 1024,
|
| seed + i, 30, 7.0,
|
| sampler_name, "simple",
|
| positive, negative,
|
| 0.5, 5, True, True,
|
| 0.5, 10, 3.0,
|
| "center-1", 0, 0.93, 0,
|
| 0.7, "False",
|
| 10, bbox_detector,
|
| segm_detector=None, sam_model_opt=sam_model_opt,
|
| wildcard_opt="", detailer_hook=None,
|
| refiner_ratio=None, refiner_model=None, refiner_clip=None,
|
| refiner_positive=None, refiner_negative=None,
|
| cycle=1, inpaint_model=False,
|
| noise_mask_feather=20,
|
| scheduler_func_opt=None,
|
| tiled_encode=False, tiled_decode=False,
|
| )
|
| out_imgs.append(enhanced_img)
|
|
|
| return (torch.cat(out_imgs, dim=0),)
|
|
|
|
|
| NODE_CLASS_MAPPINGS = {
|
| "dn_04": dn_04,
|
| }
|
| NODE_DISPLAY_NAME_MAPPINGS = {
|
| "dn_04": "dn_04",
|
| }
|
|
|