|
|
import os
|
|
|
import logging
|
|
|
from typing import List, Tuple
|
|
|
|
|
|
import torch
|
|
|
import numpy as np
|
|
|
from ultralytics import YOLO
|
|
|
|
|
|
|
|
|
import impact.core as core
|
|
|
from impact.core import SEG
|
|
|
|
|
|
|
|
|
try:
|
|
|
import tensorrt as trt
|
|
|
except Exception:
|
|
|
trt = None
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
from .utils_salia import (
|
|
|
NODE_DIR,
|
|
|
IMGSZ,
|
|
|
list_local_pt_files,
|
|
|
tensor_to_pil,
|
|
|
make_crop_region,
|
|
|
crop_image,
|
|
|
crop_ndarray2,
|
|
|
dilate_mask,
|
|
|
)
|
|
|
except ImportError:
|
|
|
|
|
|
from utils_salia import (
|
|
|
NODE_DIR,
|
|
|
IMGSZ,
|
|
|
list_local_pt_files,
|
|
|
tensor_to_pil,
|
|
|
make_crop_region,
|
|
|
crop_image,
|
|
|
crop_ndarray2,
|
|
|
dilate_mask,
|
|
|
)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TRTYOLOBBoxDetector:
|
|
|
"""
|
|
|
BBOX_DETECTOR interface compatible with Impact Pack / FaceDetailer.
|
|
|
|
|
|
Methods required:
|
|
|
- setAux(x)
|
|
|
- detect(image, threshold, dilation, crop_factor, drop_size=1, detailer_hook=None)
|
|
|
- detect_combined(image, threshold, dilation)
|
|
|
"""
|
|
|
|
|
|
def __init__(self, yolo_model: YOLO, device: str = "0"):
|
|
|
self.bbox_model = yolo_model
|
|
|
|
|
|
if device in ("0", "cuda", "cuda:0"):
|
|
|
self.device = "0"
|
|
|
else:
|
|
|
self.device = str(device)
|
|
|
|
|
|
|
|
|
self.aux = None
|
|
|
|
|
|
def setAux(self, x):
|
|
|
|
|
|
|
|
|
self.aux = x
|
|
|
|
|
|
def detect(
|
|
|
self,
|
|
|
image: torch.Tensor,
|
|
|
threshold: float,
|
|
|
dilation: int,
|
|
|
crop_factor: float,
|
|
|
drop_size: int = 1,
|
|
|
detailer_hook=None,
|
|
|
) -> Tuple[Tuple[int, int], List[SEG]]:
|
|
|
"""
|
|
|
Main detection method used by FaceDetailer.
|
|
|
|
|
|
Args:
|
|
|
image: ComfyUI IMAGE tensor [B, H, W, C] in 0..1
|
|
|
threshold: confidence threshold
|
|
|
dilation: mask dilation in pixels
|
|
|
crop_factor: expansion factor for bbox when computing crop_region
|
|
|
drop_size: minimum bbox width/height to keep
|
|
|
detailer_hook: optional hook with post_crop_region / post_detection
|
|
|
|
|
|
Returns:
|
|
|
SEGS tuple: ( (H, W), [SEG, SEG, ...] )
|
|
|
"""
|
|
|
|
|
|
if image.dim() != 4:
|
|
|
raise ValueError(
|
|
|
"[TRTYOLOBBoxDetector] Expected IMAGE tensor with 4 dims [B, H, W, C]."
|
|
|
)
|
|
|
|
|
|
if image.shape[0] != 1:
|
|
|
logger.warning(
|
|
|
"[TRTYOLOBBoxDetector] Batch > 1 detected, using only the first image for detection."
|
|
|
)
|
|
|
image = image[:1]
|
|
|
|
|
|
h, w = int(image.shape[1]), int(image.shape[2])
|
|
|
shape = (h, w)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pil_img = tensor_to_pil(image)
|
|
|
|
|
|
|
|
|
|
|
|
pred_list = self.bbox_model(
|
|
|
pil_img,
|
|
|
conf=threshold,
|
|
|
device=self.device,
|
|
|
verbose=False,
|
|
|
)
|
|
|
|
|
|
if len(pred_list) == 0:
|
|
|
return (shape, [])
|
|
|
|
|
|
pred = pred_list[0]
|
|
|
boxes = pred.boxes
|
|
|
|
|
|
if boxes is None or boxes.xyxy is None or boxes.xyxy.shape[0] == 0:
|
|
|
return (shape, [])
|
|
|
|
|
|
xyxy = boxes.xyxy.cpu().numpy()
|
|
|
confs = boxes.conf.cpu().numpy()
|
|
|
clses = boxes.cls.cpu().numpy().astype(int)
|
|
|
names = pred.names
|
|
|
|
|
|
seg_items: List[SEG] = []
|
|
|
|
|
|
for i in range(xyxy.shape[0]):
|
|
|
x1, y1, x2, y2 = xyxy[i]
|
|
|
score = float(confs[i])
|
|
|
cls_id = int(clses[i])
|
|
|
label = names.get(cls_id, str(cls_id))
|
|
|
|
|
|
box_w = x2 - x1
|
|
|
box_h = y2 - y1
|
|
|
if box_w <= drop_size or box_h <= drop_size:
|
|
|
continue
|
|
|
|
|
|
|
|
|
x1_i = max(int(np.floor(x1)), 0)
|
|
|
y1_i = max(int(np.floor(y1)), 0)
|
|
|
x2_i = min(int(np.ceil(x2)), w)
|
|
|
y2_i = min(int(np.ceil(y2)), h)
|
|
|
if x2_i <= x1_i or y2_i <= y1_i:
|
|
|
continue
|
|
|
|
|
|
|
|
|
mask = np.zeros((h, w), dtype=np.uint8)
|
|
|
mask[y1_i:y2_i, x1_i:x2_i] = 255
|
|
|
|
|
|
|
|
|
if dilation != 0:
|
|
|
mask = dilate_mask(mask, dilation)
|
|
|
|
|
|
|
|
|
item_bbox = [float(x1), float(y1), float(x2), float(y2)]
|
|
|
|
|
|
|
|
|
crop_region = make_crop_region(w, h, item_bbox, crop_factor)
|
|
|
if detailer_hook is not None and hasattr(detailer_hook, "post_crop_region"):
|
|
|
crop_region = detailer_hook.post_crop_region(
|
|
|
w, h, item_bbox, crop_region
|
|
|
)
|
|
|
|
|
|
|
|
|
cropped_image = crop_image(image, crop_region)
|
|
|
cropped_mask = crop_ndarray2(mask, crop_region)
|
|
|
|
|
|
seg = SEG(
|
|
|
cropped_image,
|
|
|
cropped_mask,
|
|
|
score,
|
|
|
crop_region,
|
|
|
item_bbox,
|
|
|
label,
|
|
|
None,
|
|
|
)
|
|
|
seg_items.append(seg)
|
|
|
|
|
|
segs = (shape, seg_items)
|
|
|
|
|
|
if detailer_hook is not None and hasattr(detailer_hook, "post_detection"):
|
|
|
segs = detailer_hook.post_detection(segs)
|
|
|
|
|
|
return segs
|
|
|
|
|
|
def detect_combined(
|
|
|
self,
|
|
|
image: torch.Tensor,
|
|
|
threshold: float,
|
|
|
dilation: int,
|
|
|
) -> torch.Tensor:
|
|
|
"""
|
|
|
Optional helper API: returns a combined MASK of all detections.
|
|
|
"""
|
|
|
segs = self.detect(
|
|
|
image=image,
|
|
|
threshold=threshold,
|
|
|
dilation=dilation,
|
|
|
crop_factor=1.0,
|
|
|
drop_size=1,
|
|
|
detailer_hook=None,
|
|
|
)
|
|
|
return core.segs_to_combined_mask(segs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TRTYOLOEngineBuilder:
|
|
|
@classmethod
|
|
|
def INPUT_TYPES(cls):
|
|
|
pt_files = list_local_pt_files()
|
|
|
default_name = pt_files[0] if pt_files else "face.pt"
|
|
|
|
|
|
return {
|
|
|
"required": {
|
|
|
"pt_model_name": (
|
|
|
pt_files if pt_files else ["face.pt"],
|
|
|
{
|
|
|
"default": default_name,
|
|
|
"tooltip": (
|
|
|
"Select a YOLO .pt file that lives in the SAME folder as this node file.\n"
|
|
|
"Example: 'face.pt' next to TensorRTBBoxDetector.py"
|
|
|
),
|
|
|
},
|
|
|
),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
RETURN_TYPES = ("STRING",)
|
|
|
RETURN_NAMES = ("engine_path",)
|
|
|
FUNCTION = "build"
|
|
|
CATEGORY = "ImpactPack/TensorRT"
|
|
|
|
|
|
def _check_tensorrt_available(self):
|
|
|
"""
|
|
|
Optional: preflight check to give a clearer error message if TensorRT
|
|
|
cannot initialize (instead of a raw pybind11::init() error deep inside Ultralytics).
|
|
|
"""
|
|
|
if trt is None:
|
|
|
raise RuntimeError(
|
|
|
"[TRTYOLOEngineBuilder] TensorRT Python package is not available. "
|
|
|
"Install it via pip (cu12 build) or use an image with TensorRT preinstalled."
|
|
|
)
|
|
|
try:
|
|
|
logger_trt = trt.Logger(trt.Logger.ERROR)
|
|
|
builder = trt.Builder(logger_trt)
|
|
|
|
|
|
del builder
|
|
|
except Exception as e:
|
|
|
raise RuntimeError(
|
|
|
"[TRTYOLOEngineBuilder] TensorRT failed to initialize. "
|
|
|
"Check that your CUDA driver, CUDA runtime, and TensorRT versions match. "
|
|
|
f"Original error: {e}"
|
|
|
) from e
|
|
|
|
|
|
def build(self, pt_model_name: str):
|
|
|
|
|
|
pt_path = os.path.join(NODE_DIR, pt_model_name)
|
|
|
if not os.path.isfile(pt_path):
|
|
|
raise FileNotFoundError(
|
|
|
f"[TRTYOLOEngineBuilder] .pt model not found next to this node: {pt_path}"
|
|
|
)
|
|
|
|
|
|
|
|
|
self._check_tensorrt_available()
|
|
|
|
|
|
logger.info(
|
|
|
f"[TRTYOLOEngineBuilder] Exporting TensorRT engine from '{pt_path}' "
|
|
|
f"with imgsz={IMGSZ} (H,W), batch=1, half=True, device='0', exist_ok=True"
|
|
|
)
|
|
|
|
|
|
try:
|
|
|
|
|
|
engine_path = YOLO(pt_path).export(
|
|
|
format="engine",
|
|
|
imgsz=IMGSZ,
|
|
|
half=True,
|
|
|
device="0",
|
|
|
exist_ok=True,
|
|
|
)
|
|
|
except TypeError:
|
|
|
|
|
|
engine_path = YOLO(pt_path).export(
|
|
|
format="engine",
|
|
|
imgsz=IMGSZ,
|
|
|
half=True,
|
|
|
device="0",
|
|
|
)
|
|
|
|
|
|
engine_path = str(engine_path)
|
|
|
|
|
|
|
|
|
if not os.path.isabs(engine_path):
|
|
|
candidate = os.path.join(NODE_DIR, engine_path)
|
|
|
if os.path.isfile(candidate):
|
|
|
engine_path = candidate
|
|
|
|
|
|
if not os.path.isfile(engine_path):
|
|
|
raise FileNotFoundError(
|
|
|
f"[TRTYOLOEngineBuilder] Export completed but engine file not found at: {engine_path}"
|
|
|
)
|
|
|
|
|
|
logger.info(f"[TRTYOLOEngineBuilder] Export done. Engine path: {engine_path}")
|
|
|
|
|
|
return (engine_path,)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TRTYOLOBBoxDetectorProvider:
|
|
|
@classmethod
|
|
|
def INPUT_TYPES(cls):
|
|
|
return {
|
|
|
"required": {
|
|
|
"engine_path": (
|
|
|
"STRING",
|
|
|
{
|
|
|
"default": "",
|
|
|
"multiline": False,
|
|
|
"tooltip": (
|
|
|
"Path to .engine file.\n"
|
|
|
"Can be an absolute path OR a path RELATIVE to this node's folder.\n"
|
|
|
"Typically, you connect this from TRTYOLOEngineBuilder."
|
|
|
),
|
|
|
},
|
|
|
),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
RETURN_TYPES = ("BBOX_DETECTOR",)
|
|
|
RETURN_NAMES = ("bbox_detector",)
|
|
|
FUNCTION = "load"
|
|
|
CATEGORY = "ImpactPack/TensorRT"
|
|
|
|
|
|
def load(self, engine_path: str):
|
|
|
if not engine_path:
|
|
|
raise ValueError(
|
|
|
"[TRTYOLOBBoxDetectorProvider] engine_path is empty. "
|
|
|
"Connect the output from TRTYOLOEngineBuilder or type a path."
|
|
|
)
|
|
|
|
|
|
engine_path = engine_path.strip()
|
|
|
|
|
|
|
|
|
if not os.path.isabs(engine_path):
|
|
|
engine_path = os.path.join(NODE_DIR, engine_path)
|
|
|
|
|
|
if not os.path.isfile(engine_path):
|
|
|
raise FileNotFoundError(
|
|
|
f"[TRTYOLOBBoxDetectorProvider] Engine file not found: {engine_path}"
|
|
|
)
|
|
|
|
|
|
logger.info(
|
|
|
f"[TRTYOLOBBoxDetectorProvider] Loading YOLO TensorRT engine from '{engine_path}' on device '0'"
|
|
|
)
|
|
|
|
|
|
|
|
|
yolo_model = YOLO(engine_path)
|
|
|
detector = TRTYOLOBBoxDetector(yolo_model, device="0")
|
|
|
|
|
|
return (detector,)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
NODE_CLASS_MAPPINGS = {
|
|
|
"TRTYOLOEngineBuilder": TRTYOLOEngineBuilder,
|
|
|
"TRTYOLOBBoxDetectorProvider": TRTYOLOBBoxDetectorProvider,
|
|
|
}
|
|
|
|
|
|
NODE_DISPLAY_NAME_MAPPINGS = {
|
|
|
"TRTYOLOEngineBuilder": "TensorRT YOLO Engine Builder (1344x768, local .pt)",
|
|
|
"TRTYOLOBBoxDetectorProvider": "TensorRT YOLO BBox Detector",
|
|
|
}
|
|
|
|