| """ |
| TSPoseDataSmoother — DWPose temporal smoothing and rendering node. |
| Recreated from original comfyui-teskors-utils by teskor-hub. |
| |
| This node takes POSEDATA from PoseAndFaceDetection, applies exponential |
| moving average smoothing across frames, filters out extra people, |
| and outputs smoothed pose images and data. |
| """ |
|
|
| import numpy as np |
| import torch |
| import copy |
| import logging |
|
|
| from comfy.utils import ProgressBar |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def _get_keypoints_array(meta, key): |
| """Extract keypoints array from an AAPoseMeta or dict-based meta.""" |
| if hasattr(meta, key): |
| kp = getattr(meta, key) |
| elif isinstance(meta, dict) and key in meta: |
| kp = meta[key] |
| else: |
| return None |
| if kp is None: |
| return None |
| if isinstance(kp, np.ndarray): |
| return kp.copy() |
| return np.array(kp, dtype=np.float32) |
|
|
|
|
| def _set_keypoints_array(meta, key, value): |
| """Set keypoints array back into meta.""" |
| if hasattr(meta, key): |
| setattr(meta, key, value) |
| elif isinstance(meta, dict): |
| meta[key] = value |
|
|
|
|
| def _ema_smooth(prev_kp, curr_kp, alpha, conf_thresh): |
| """ |
| Apply exponential moving average smoothing. |
| Only smooth keypoints that have confidence above threshold. |
| |
| prev_kp, curr_kp: numpy arrays of shape (N, 3) with [x, y, confidence] |
| alpha: smoothing factor (0-1), higher = more smoothing from current frame |
| conf_thresh: minimum confidence for a keypoint to be considered valid |
| """ |
| if prev_kp is None or curr_kp is None: |
| return curr_kp |
| if prev_kp.shape != curr_kp.shape: |
| return curr_kp |
| |
| smoothed = curr_kp.copy() |
| n_points = min(prev_kp.shape[0], curr_kp.shape[0]) |
| |
| for i in range(n_points): |
| |
| prev_conf = prev_kp[i, 2] if prev_kp.shape[1] > 2 else 1.0 |
| curr_conf = curr_kp[i, 2] if curr_kp.shape[1] > 2 else 1.0 |
| |
| if prev_conf >= conf_thresh and curr_conf >= conf_thresh: |
| |
| smoothed[i, :2] = alpha * curr_kp[i, :2] + (1 - alpha) * prev_kp[i, :2] |
| |
| |
| return smoothed |
|
|
|
|
| def _filter_to_primary_person(pose_metas, min_run_frames): |
| """ |
| When multiple people are detected, keep only the most prominent one. |
| Identifies the primary person based on bbox area and continuous presence. |
| Returns the filtered metas (list of same type). |
| """ |
| |
| |
| |
| |
| return pose_metas |
|
|
|
|
| def _smooth_pose_sequence(pose_metas, smooth_alpha, gap_frames, min_run_frames, |
| conf_thresh_body, conf_thresh_hands, filter_extra_people): |
| """ |
| Apply temporal smoothing to a sequence of pose meta data. |
| |
| Args: |
| pose_metas: list of AAPoseMeta objects or dicts |
| smooth_alpha: EMA blending factor (higher = favor current frame more) |
| gap_frames: max gap to interpolate across |
| min_run_frames: minimum consecutive frames for a valid detection run |
| conf_thresh_body: confidence threshold for body keypoints |
| conf_thresh_hands: confidence threshold for hand keypoints |
| filter_extra_people: whether to filter to single person |
| |
| Returns: |
| list of smoothed pose metas (deep copies) |
| """ |
| if not pose_metas: |
| return pose_metas |
| |
| |
| smoothed_metas = [] |
| for meta in pose_metas: |
| smoothed_metas.append(copy.deepcopy(meta)) |
| |
| if filter_extra_people: |
| smoothed_metas = _filter_to_primary_person(smoothed_metas, min_run_frames) |
| |
| |
| body_keys = ['keypoints_body'] |
| hand_keys = ['keypoints_lhand', 'keypoints_rhand'] |
| face_keys = ['keypoints_face'] |
| |
| prev_body = None |
| prev_lhand = None |
| prev_rhand = None |
| |
| gap_counter = 0 |
| |
| for i, meta in enumerate(smoothed_metas): |
| |
| curr_body = _get_keypoints_array(meta, 'keypoints_body') |
| if curr_body is not None: |
| if prev_body is not None and gap_counter <= gap_frames: |
| smoothed_body = _ema_smooth(prev_body, curr_body, smooth_alpha, conf_thresh_body) |
| _set_keypoints_array(meta, 'keypoints_body', smoothed_body) |
| prev_body = smoothed_body |
| else: |
| prev_body = curr_body |
| gap_counter = 0 |
| else: |
| gap_counter += 1 |
| |
| |
| curr_lhand = _get_keypoints_array(meta, 'keypoints_lhand') |
| if curr_lhand is not None and prev_lhand is not None: |
| smoothed_lhand = _ema_smooth(prev_lhand, curr_lhand, smooth_alpha, conf_thresh_hands) |
| _set_keypoints_array(meta, 'keypoints_lhand', smoothed_lhand) |
| prev_lhand = smoothed_lhand |
| elif curr_lhand is not None: |
| prev_lhand = curr_lhand |
| |
| |
| curr_rhand = _get_keypoints_array(meta, 'keypoints_rhand') |
| if curr_rhand is not None and prev_rhand is not None: |
| smoothed_rhand = _ema_smooth(prev_rhand, curr_rhand, smooth_alpha, conf_thresh_hands) |
| _set_keypoints_array(meta, 'keypoints_rhand', smoothed_rhand) |
| prev_rhand = smoothed_rhand |
| elif curr_rhand is not None: |
| prev_rhand = curr_rhand |
| |
| return smoothed_metas |
|
|
|
|
| class TSPoseDataSmoother: |
| """ |
| Smooths pose data across video frames using temporal EMA filtering. |
| Reduces jitter/trembling in detected poses for smoother animation. |
| """ |
| |
| @classmethod |
| def INPUT_TYPES(s): |
| return { |
| "required": { |
| "pose_data": ("POSEDATA",), |
| "filter_extra_people": ("BOOLEAN", { |
| "default": True, |
| "tooltip": "Filter to keep only the primary detected person" |
| }), |
| "smooth_alpha": ("FLOAT", { |
| "default": 0.70, |
| "min": 0.0, |
| "max": 1.0, |
| "step": 0.01, |
| "tooltip": "EMA smoothing factor. Higher = more weight on current frame (less smoothing). Lower = more weight on previous frames (more smoothing)." |
| }), |
| "gap_frames": ("INT", { |
| "default": 12, |
| "min": 0, |
| "max": 120, |
| "step": 1, |
| "tooltip": "Maximum gap (in frames) to bridge when a detection is temporarily lost." |
| }), |
| "min_run_frames": ("INT", { |
| "default": 2, |
| "min": 1, |
| "max": 30, |
| "step": 1, |
| "tooltip": "Minimum consecutive frames a person must be detected to be considered valid." |
| }), |
| "conf_thresh_body": ("FLOAT", { |
| "default": 0.20, |
| "min": 0.0, |
| "max": 1.0, |
| "step": 0.01, |
| "tooltip": "Minimum confidence threshold for body keypoints to be smoothed." |
| }), |
| "conf_thresh_hands": ("FLOAT", { |
| "default": 0.50, |
| "min": 0.0, |
| "max": 1.0, |
| "step": 0.01, |
| "tooltip": "Minimum confidence threshold for hand keypoints to be smoothed." |
| }), |
| }, |
| } |
| |
| RETURN_TYPES = ("IMAGE", "POSEDATA") |
| RETURN_NAMES = ("IMAGE", "pose_data") |
| FUNCTION = "smooth" |
| CATEGORY = "WanAnimatePreprocess" |
| DESCRIPTION = "Smooths pose data across video frames using temporal EMA filtering to reduce jitter in detected poses." |
| |
| def smooth(self, pose_data, filter_extra_people, smooth_alpha, gap_frames, |
| min_run_frames, conf_thresh_body, conf_thresh_hands): |
| |
| pose_metas = pose_data.get("pose_metas", []) |
| pose_metas_original = pose_data.get("pose_metas_original", []) |
| |
| if not pose_metas: |
| logger.warning("TSPoseDataSmoother: No pose_metas found in pose_data") |
| return (torch.zeros(1, 64, 64, 3), pose_data) |
| |
| |
| first_meta = pose_metas_original[0] if pose_metas_original else pose_metas[0] |
| if hasattr(first_meta, 'width'): |
| width = first_meta.width if hasattr(first_meta, 'width') else first_meta.get('width', 512) |
| height = first_meta.height if hasattr(first_meta, 'height') else first_meta.get('height', 512) |
| elif isinstance(first_meta, dict): |
| width = first_meta.get('width', 512) |
| height = first_meta.get('height', 512) |
| else: |
| width = 512 |
| height = 512 |
| |
| |
| smoothed_metas = _smooth_pose_sequence( |
| pose_metas, |
| smooth_alpha=smooth_alpha, |
| gap_frames=gap_frames, |
| min_run_frames=min_run_frames, |
| conf_thresh_body=conf_thresh_body, |
| conf_thresh_hands=conf_thresh_hands, |
| filter_extra_people=filter_extra_people, |
| ) |
| |
| |
| |
| try: |
| from ComfyUI_WanAnimatePreprocess_module import draw_aapose_by_meta_new |
| except ImportError: |
| pass |
| |
| |
| draw_fn = None |
| try: |
| import importlib |
| import sys |
| |
| import os |
| custom_nodes_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| wan_preprocess_dir = os.path.join(custom_nodes_dir, "ComfyUI-WanAnimatePreprocess") |
| |
| if os.path.exists(wan_preprocess_dir): |
| sys.path.insert(0, wan_preprocess_dir) |
| from pose_utils.human_visualization import draw_aapose_by_meta_new |
| from utils import padding_resize |
| draw_fn = draw_aapose_by_meta_new |
| sys.path.pop(0) |
| except ImportError as e: |
| logger.warning(f"TSPoseDataSmoother: Could not import drawing functions: {e}") |
| |
| comfy_pbar = ProgressBar(len(smoothed_metas)) |
| pose_images = [] |
| |
| for idx, meta in enumerate(smoothed_metas): |
| canvas = np.zeros((height, width, 3), dtype=np.uint8) |
| |
| if draw_fn is not None: |
| try: |
| pose_image = draw_fn(canvas, meta, draw_hand=True, draw_head=True) |
| |
| try: |
| pose_image = padding_resize(pose_image, height, width) |
| except Exception: |
| pass |
| except Exception as e: |
| logger.warning(f"TSPoseDataSmoother: Drawing failed on frame {idx}: {e}") |
| pose_image = canvas |
| else: |
| |
| pose_image = _fallback_draw_pose(canvas, meta, height, width) |
| |
| pose_images.append(pose_image) |
| if (idx + 1) % 10 == 0: |
| comfy_pbar.update_absolute(idx + 1) |
| |
| comfy_pbar.update_absolute(len(smoothed_metas)) |
| |
| pose_images_np = np.stack(pose_images, 0) |
| pose_images_tensor = torch.from_numpy(pose_images_np).float() / 255.0 |
| |
| |
| smoothed_pose_data = dict(pose_data) |
| smoothed_pose_data["pose_metas"] = smoothed_metas |
| |
| return (pose_images_tensor, smoothed_pose_data) |
|
|
|
|
| def _fallback_draw_pose(canvas, meta, height, width): |
| """ |
| Simple fallback pose renderer when ComfyUI-WanAnimatePreprocess |
| drawing functions are not available. |
| """ |
| import cv2 |
| |
| kp_body = _get_keypoints_array(meta, 'keypoints_body') |
| if kp_body is None: |
| return canvas |
| |
| |
| body_connections = [ |
| (0, 1), (0, 2), (1, 3), (2, 4), |
| (5, 6), (5, 7), (7, 9), (6, 8), (8, 10), |
| (5, 11), (6, 12), (11, 12), |
| (11, 13), (13, 15), (12, 14), (14, 16), |
| ] |
| |
| |
| for conn in body_connections: |
| i, j = conn |
| if i < len(kp_body) and j < len(kp_body): |
| x1 = int(kp_body[i][0] * width) if kp_body[i][0] <= 1.0 else int(kp_body[i][0]) |
| y1 = int(kp_body[i][1] * height) if kp_body[i][1] <= 1.0 else int(kp_body[i][1]) |
| x2 = int(kp_body[j][0] * width) if kp_body[j][0] <= 1.0 else int(kp_body[j][0]) |
| y2 = int(kp_body[j][1] * height) if kp_body[j][1] <= 1.0 else int(kp_body[j][1]) |
| |
| conf1 = kp_body[i][2] if kp_body.shape[1] > 2 else 1.0 |
| conf2 = kp_body[j][2] if kp_body.shape[1] > 2 else 1.0 |
| |
| if conf1 > 0.1 and conf2 > 0.1: |
| cv2.line(canvas, (x1, y1), (x2, y2), (0, 255, 0), 2) |
| |
| |
| for i in range(min(len(kp_body), 17)): |
| x = int(kp_body[i][0] * width) if kp_body[i][0] <= 1.0 else int(kp_body[i][0]) |
| y = int(kp_body[i][1] * height) if kp_body[i][1] <= 1.0 else int(kp_body[i][1]) |
| conf = kp_body[i][2] if kp_body.shape[1] > 2 else 1.0 |
| if conf > 0.1: |
| cv2.circle(canvas, (x, y), 3, (0, 0, 255), -1) |
| |
| return canvas |
|
|
|
|
| |
| NODE_CLASS_MAPPINGS = { |
| "TSPoseDataSmoother": TSPoseDataSmoother, |
| } |
|
|
| NODE_DISPLAY_NAME_MAPPINGS = { |
| "TSPoseDataSmoother": "TS Pose Data Smoother", |
| } |
|
|