from __future__ import annotations import copy import math import pickle import threading from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple, Union import numpy as np import cv2 import torch # ============================================================ # ComfyUI Node (pose_data + PKL) # ============================================================ _GLOBAL_LOCK = threading.Lock() class KPSSmoothPoseDataAndRender: """ Сглаживание + рендер позы. Вход: POSEDATA (как объект/dict; обычно приходит из TSLoadPoseDataPickle). Выход: IMAGE (torch [T,H,W,3] float 0..1), POSEDATA (в том же формате, но сглаженный). """ @classmethod def INPUT_TYPES(cls): return { "required": { "pose_data": ("POSEDATA",), # <-- ВАЖНО: именно POSEDATA "filter_extra_people": ("BOOLEAN", {"default": True}), # общий набор параметров сглаживания (вместо body + face_hands) "smooth_alpha": ("FLOAT", {"default": 0.7, "min": 0.01, "max": 0.99, "step": 0.01}), "gap_frames": ("INT", {"default": 12, "min": 0, "max": 100, "step": 1}), "min_run_frames": ("INT", {"default": 2, "min": 1, "max": 60, "step": 1}), # пороги отрисовки (в инпут добавляем body/hands, face НЕ добавляем) "conf_thresh_body": ("FLOAT", {"default": 0.20, "min": 0.0, "max": 1.0, "step": 0.01}), "conf_thresh_hands": ("FLOAT", {"default": 0.50, "min": 0.0, "max": 1.0, "step": 0.01}), } } RETURN_TYPES = ("IMAGE", "POSEDATA") # <-- ВАЖНО: именно POSEDATA RETURN_NAMES = ("IMAGE", "pose_data") FUNCTION = "run" CATEGORY = "posedata" def run(self, pose_data, **kwargs): filter_extra_people = bool(kwargs.get("filter_extra_people", True)) # общий набор smooth_alpha = float(kwargs.get("smooth_alpha", 0.7)) gap_frames = int(kwargs.get("gap_frames", 12)) min_run_frames = int(kwargs.get("min_run_frames", 2)) # пороги рендера conf_thresh_body = float(kwargs.get("conf_thresh_body", 0.20)) conf_thresh_hands = float(kwargs.get("conf_thresh_hands", 0.50)) conf_thresh_face = 0.20 # <- НЕ добавляем в INPUT, но фиксируем как ты просил force_body_18 = bool(kwargs.get("force_body_18", False)) pose_data = _coerce_pose_data_to_obj(pose_data) # pose_data -> frames_json_like frames_json_like, meta_ref = _pose_data_to_kps_frames(pose_data, force_body_18=force_body_18) with _GLOBAL_LOCK: old = _snapshot_tunable_globals() try: # BODY globals()["ALPHA_BODY"] = smooth_alpha globals()["SUPER_SMOOTH_ALPHA"] = smooth_alpha globals()["MAX_GAP_FRAMES"] = gap_frames globals()["MIN_RUN_FRAMES"] = min_run_frames # FACE+HANDS (dense) тоже от общего набора globals()["DENSE_SUPER_SMOOTH_ALPHA"] = smooth_alpha globals()["DENSE_MAX_GAP_FRAMES"] = gap_frames globals()["DENSE_MIN_RUN_FRAMES"] = min_run_frames globals()["FILTER_EXTRA_PEOPLE"] = filter_extra_people smoothed_frames = smooth_KPS_json_obj( frames_json_like, keep_face_untouched=False, keep_hands_untouched=False, filter_extra_people=filter_extra_people, ) finally: _restore_tunable_globals(old) # frames_json_like -> pose_data (обратно в pose_metas) out_pose_data = _kps_frames_to_pose_data(pose_data, smoothed_frames, meta_ref, force_body_18=force_body_18) # render w, h = _extract_canvas_wh(smoothed_frames, default_w=720, default_h=1280) frames_np = [] for fr in smoothed_frames: if isinstance(fr, dict) and fr.get("people"): img = _draw_pose_frame_full( w, h, fr["people"][0], conf_thresh_body=conf_thresh_body, conf_thresh_hands=conf_thresh_hands, conf_thresh_face=conf_thresh_face, ) else: img = np.zeros((h, w, 3), dtype=np.uint8) frames_np.append(img) frames_t = torch.from_numpy(np.stack(frames_np, axis=0)).float() / 255.0 return (frames_t, out_pose_data) # ============================================================ # PKL / pose_data IO # ============================================================ class _PoseDummyObj: def __init__(self, *a, **k): pass def __setstate__(self, state): # поддержка dict и (dict, slotstate) if isinstance(state, dict): self.__dict__.update(state) elif isinstance(state, (list, tuple)) and len(state) == 2 and isinstance(state[0], dict): self.__dict__.update(state[0]) if isinstance(state[1], dict): self.__dict__.update(state[1]) else: self.__dict__["_slotstate"] = state[1] else: self.__dict__["_state"] = state class _SafeUnpickler(pickle.Unpickler): """ Безопасно грузим PKL из ComfyUI окружения: - ремап numpy._core -> numpy.core - неизвестные классы (WanAnimatePreprocess.*) превращаем в простые объекты с __dict__ """ def find_class(self, module, name): # ремап внутренних путей numpy (частая проблема между версиями) if module.startswith("numpy._core"): module = module.replace("numpy._core", "numpy.core", 1) if module.startswith("numpy._globals"): module = module.replace("numpy._globals", "numpy", 1) # конкретные классы метаданных (если встречаются) if name in {"AAPoseMeta"}: return _PoseDummyObj try: return super().find_class(module, name) except Exception: return _PoseDummyObj def _load_pose_data_pkl(path: str) -> Any: with open(path, "rb") as f: return _SafeUnpickler(f).load() def _coerce_pose_data_to_obj(pd: Any) -> Any: """ Accepts: - dict pose_data - object with attributes like .pose_metas (AAPoseMeta-like) - str path to .pkl - dict wrapper with 'pose_data' """ if isinstance(pd, str): obj = _load_pose_data_pkl(pd) return obj if isinstance(pd, dict) and "pose_data" in pd: return pd["pose_data"] return pd # ============================================================ # pose_data <-> JSON-like KPS frames # ============================================================ def _as_attr(x: Any, key: str, default=None): if isinstance(x, dict): return x.get(key, default) return getattr(x, key, default) def _set_attr(x: Any, key: str, value: Any): if isinstance(x, dict): x[key] = value else: setattr(x, key, value) def _xy_p_to_flat(xy: Optional[np.ndarray], p: Optional[np.ndarray]) -> Optional[List[float]]: if xy is None: return None arr = np.asarray(xy) if arr.ndim != 2 or arr.shape[1] < 2: return None N = arr.shape[0] if p is None: pp = np.ones((N,), dtype=np.float32) else: pp = np.asarray(p).reshape(-1) if pp.shape[0] != N: # если вдруг не совпали — подстрахуемся pp = np.ones((N,), dtype=np.float32) out: List[float] = [] for i in range(N): out.extend([float(arr[i, 0]), float(arr[i, 1]), float(pp[i])]) return out def _flat_to_xy_p(flat: Optional[List[float]]) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: if not isinstance(flat, list) or len(flat) % 3 != 0: return None, None N = len(flat) // 3 xy = np.zeros((N, 2), dtype=np.float32) p = np.zeros((N,), dtype=np.float32) for i in range(N): xy[i, 0] = float(flat[3 * i + 0]) xy[i, 1] = float(flat[3 * i + 1]) p[i] = float(flat[3 * i + 2]) return xy, p def _pose_data_to_kps_frames(pose_data: Any, *, force_body_18: bool) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: """ Делает "как JSON" список кадров: frame = {"people":[{pose_keypoints_2d, face_keypoints_2d, hand_left_keypoints_2d, hand_right_keypoints_2d}], "canvas_width": W, "canvas_height": H} meta_ref: ссылки на pose_metas + тип/доступ, чтобы правильно записать обратно. """ pose_metas = _as_attr(pose_data, "pose_metas", None) if pose_metas is None: # иногда называют иначе pose_metas = _as_attr(pose_data, "frames", None) if pose_metas is None or not isinstance(pose_metas, list): raise ValueError("pose_data does not contain 'pose_metas' list.") frames: List[Dict[str, Any]] = [] for meta in pose_metas: h = _as_attr(meta, "height", 1280) w = _as_attr(meta, "width", 720) kps_body = _as_attr(meta, "kps_body", None) kps_body_p = _as_attr(meta, "kps_body_p", None) kps_face = _as_attr(meta, "kps_face", None) kps_face_p = _as_attr(meta, "kps_face_p", None) kps_lhand = _as_attr(meta, "kps_lhand", None) kps_lhand_p = _as_attr(meta, "kps_lhand_p", None) kps_rhand = _as_attr(meta, "kps_rhand", None) kps_rhand_p = _as_attr(meta, "kps_rhand_p", None) # to flat pose_flat = _xy_p_to_flat(kps_body, kps_body_p) face_flat = _xy_p_to_flat(kps_face, kps_face_p) lh_flat = _xy_p_to_flat(kps_lhand, kps_lhand_p) rh_flat = _xy_p_to_flat(kps_rhand, kps_rhand_p) if force_body_18 and isinstance(pose_flat, list) and len(pose_flat) >= 18 * 3: pose_flat = pose_flat[: 18 * 3] person = { "pose_keypoints_2d": pose_flat if pose_flat is not None else [], "face_keypoints_2d": face_flat if face_flat is not None else [], "hand_left_keypoints_2d": lh_flat, "hand_right_keypoints_2d": rh_flat, } frame = {"people": [person], "canvas_height": int(h), "canvas_width": int(w)} frames.append(frame) meta_ref = { "pose_metas": pose_metas, "len": len(pose_metas), } return frames, meta_ref def _kps_frames_to_pose_data( pose_data_in: Any, frames_kps: List[Dict[str, Any]], meta_ref: Dict[str, Any], *, force_body_18: bool, ) -> Any: """ Записывает обратно сглаженные keypoints в pose_metas[*].kps_* / kps_*_p. Остальные поля pose_data сохраняем. """ out_pd = copy.deepcopy(pose_data_in) pose_metas_out = _as_attr(out_pd, "pose_metas", None) if pose_metas_out is None: # fallback: вдруг другой ключ pose_metas_out = meta_ref.get("pose_metas") if pose_metas_out is None or not isinstance(pose_metas_out, list): raise ValueError("Failed to locate pose_metas in output pose_data.") T = min(len(pose_metas_out), len(frames_kps)) for t in range(T): meta = pose_metas_out[t] fr = frames_kps[t] people = fr.get("people", []) if isinstance(fr, dict) else [] p0 = people[0] if people else None if not isinstance(p0, dict): continue pose_flat = p0.get("pose_keypoints_2d") face_flat = p0.get("face_keypoints_2d") lh_flat = p0.get("hand_left_keypoints_2d") rh_flat = p0.get("hand_right_keypoints_2d") if force_body_18 and isinstance(pose_flat, list) and len(pose_flat) >= 18 * 3: pose_flat = pose_flat[: 18 * 3] body_xy, body_p = _flat_to_xy_p(pose_flat if isinstance(pose_flat, list) else None) face_xy, face_p = _flat_to_xy_p(face_flat if isinstance(face_flat, list) else None) lh_xy, lh_p = _flat_to_xy_p(lh_flat if isinstance(lh_flat, list) else None) rh_xy, rh_p = _flat_to_xy_p(rh_flat if isinstance(rh_flat, list) else None) if body_xy is not None and body_p is not None: _set_attr(meta, "kps_body", body_xy.astype(np.float32, copy=False)) _set_attr(meta, "kps_body_p", body_p.astype(np.float32, copy=False)) if face_xy is not None and face_p is not None: _set_attr(meta, "kps_face", face_xy.astype(np.float32, copy=False)) _set_attr(meta, "kps_face_p", face_p.astype(np.float32, copy=False)) if lh_xy is not None and lh_p is not None: _set_attr(meta, "kps_lhand", lh_xy.astype(np.float32, copy=False)) _set_attr(meta, "kps_lhand_p", lh_p.astype(np.float32, copy=False)) if rh_xy is not None and rh_p is not None: _set_attr(meta, "kps_rhand", rh_xy.astype(np.float32, copy=False)) _set_attr(meta, "kps_rhand_p", rh_p.astype(np.float32, copy=False)) # обновим width/height если нужно if isinstance(fr, dict): if "canvas_width" in fr: _set_attr(meta, "width", int(fr["canvas_width"])) if "canvas_height" in fr: _set_attr(meta, "height", int(fr["canvas_height"])) # обязательно положим pose_metas обратно _set_attr(out_pd, "pose_metas", pose_metas_out) return out_pd def _extract_canvas_wh(data: Any, default_w: int, default_h: int) -> Tuple[int, int]: w, h = int(default_w), int(default_h) if isinstance(data, list): for fr in data: if isinstance(fr, dict) and "canvas_width" in fr and "canvas_height" in fr: try: w = int(fr["canvas_width"]) h = int(fr["canvas_height"]) break except Exception: pass return w, h # ============================================================ # === START: smooth_KPS_json.py logic (ported as-is) # ============================================================ # --- Root+Scale carry (when torso disappears on close-up) --- ROOTSCALE_CARRY_ENABLED = True CARRY_MAX_FRAMES = 48 CARRY_MIN_ANCHORS = 2 CARRY_ANCHOR_JOINTS = [0, 1, 2, 5, 3, 6, 4, 7] CARRY_CONF_GATE = 0.20 # --- Main person selection / multi-person filtering --- FILTER_EXTRA_PEOPLE = True MAIN_PERSON_MODE = "longest_track" TRACK_MATCH_MIN_PX = 80.0 TRACK_MATCH_FACTOR = 3.0 TRACK_MAX_FRAME_GAP = 32 # --- Spatial outlier suppression --- SPATIAL_OUTLIER_FIX = True BONE_MAX_FACTOR = 2.3 TORSO_RADIUS_FACTOR = 4.0 # EMA smoothing for BODY only (online) ALPHA_BODY = 0.70 MAX_STEP_BODY = 60.0 VEL_ALPHA = 0.45 EPS = 0.3 CONF_GATE_BODY = 0.20 CONF_FLOOR_BODY = 0.00 TRACK_DIST_PENALTY = 1.5 FACE_WEIGHT_IN_SCORE = 0.15 HAND_WEIGHT_IN_SCORE = 0.35 ALLOW_DISAPPEAR_JOINTS = {3, 4, 6, 7} GAP_FILL_ENABLED = True MAX_GAP_FRAMES = 12 MIN_RUN_FRAMES = 2 TORSO_SYNC_ENABLED = True TORSO_JOINTS = {1, 2, 5, 8, 11} TORSO_LOOKAHEAD_FRAMES = 32 SUPER_SMOOTH_ENABLED = True SUPER_SMOOTH_ALPHA = 0.7 SUPER_SMOOTH_MIN_CONF = 0.20 MEDIAN3_ENABLED = True FACE_SMOOTH_ENABLED = True HANDS_SMOOTH_ENABLED = False CONF_GATE_FACE = 0.20 CONF_GATE_HAND = 0.50 HAND_MIN_POINTS_PRESENT = 7 MIN_HAND_RUN_FRAMES = 6 DENSE_GAP_FILL_ENABLED = False DENSE_MAX_GAP_FRAMES = 8 DENSE_MIN_RUN_FRAMES = 2 DENSE_MEDIAN3_ENABLED = False DENSE_SUPER_SMOOTH_ENABLED = False DENSE_SUPER_SMOOTH_ALPHA = 0.7 def _snapshot_tunable_globals() -> Dict[str, Any]: keys = [ "FILTER_EXTRA_PEOPLE", "SUPER_SMOOTH_ALPHA", "MAX_GAP_FRAMES", "MIN_RUN_FRAMES", "DENSE_SUPER_SMOOTH_ALPHA", "DENSE_MAX_GAP_FRAMES", "DENSE_MIN_RUN_FRAMES", ] return {k: globals().get(k) for k in keys} def _restore_tunable_globals(old: Dict[str, Any]) -> None: for k, v in old.items(): globals()[k] = v def _is_valid_xyc(x: float, y: float, c: float) -> bool: if c is None: return False if c <= 0: return False if x == 0 and y == 0: return False if math.isnan(x) or math.isnan(y) or math.isnan(c): return False return True def _reshape_keypoints_2d(arr: List[float]) -> List[Tuple[float, float, float]]: if arr is None: return [] if len(arr) % 3 != 0: raise ValueError(f"keypoints length not multiple of 3: {len(arr)}") out = [] for i in range(0, len(arr), 3): out.append((float(arr[i]), float(arr[i + 1]), float(arr[i + 2]))) return out def _flatten_keypoints_2d(kps: List[Tuple[float, float, float]]) -> List[float]: out: List[float] = [] for x, y, c in kps: out.extend([float(x), float(y), float(c)]) return out def _sum_conf(arr: Optional[List[float]], sample_step: int = 1) -> float: if not arr: return 0.0 s = 0.0 for i in range(2, len(arr), 3 * sample_step): try: c = float(arr[i]) except Exception: c = 0.0 if c > 0: s += c return s def _body_center_from_pose(pose_arr: Optional[List[float]]) -> Optional[Tuple[float, float]]: if not pose_arr: return None kps = _reshape_keypoints_2d(pose_arr) idxs = [2, 5, 8, 11, 1] pts = [] for idx in idxs: if idx < len(kps): x, y, c = kps[idx] if _is_valid_xyc(x, y, c): pts.append((x, y)) if not pts: for x, y, c in kps: if _is_valid_xyc(x, y, c): pts.append((x, y)) if not pts: return None cx = sum(p[0] for p in pts) / len(pts) cy = sum(p[1] for p in pts) / len(pts) return (cx, cy) def _dist(a: Tuple[float, float], b: Tuple[float, float]) -> float: return math.hypot(a[0] - b[0], a[1] - b[1]) def _choose_single_person( people: List[Dict[str, Any]], prev_center: Optional[Tuple[float, float]] ) -> Optional[Dict[str, Any]]: if not people: return None best = None best_score = -1e18 for p in people: pose = p.get("pose_keypoints_2d") face = p.get("face_keypoints_2d") lh = p.get("hand_left_keypoints_2d") rh = p.get("hand_right_keypoints_2d") score = _sum_conf(pose) score += FACE_WEIGHT_IN_SCORE * _sum_conf(face, sample_step=4) score += HAND_WEIGHT_IN_SCORE * (_sum_conf(lh, sample_step=2) + _sum_conf(rh, sample_step=2)) center = _body_center_from_pose(pose) if prev_center is not None and center is not None: score -= TRACK_DIST_PENALTY * _dist(prev_center, center) if score > best_score: best_score = score best = p return best @dataclass class _Track: frames: Dict[int, Dict[str, Any]] centers: Dict[int, Tuple[float, float]] last_t: int last_center: Tuple[float, float] def _estimate_torso_scale(pose: List[Tuple[float, float, float]]) -> Optional[float]: def dist(i, k) -> Optional[float]: if i >= len(pose) or k >= len(pose): return None xi, yi, ci = pose[i] xk, yk, ck = pose[k] if not _is_valid_xyc(xi, yi, ci) or not _is_valid_xyc(xk, yk, ck): return None return math.hypot(xi - xk, yi - yk) cand = [dist(2, 5), dist(8, 11), dist(1, 8), dist(1, 11)] cand = [c for c in cand if c is not None and c > 1e-3] if not cand: return None return float(sum(cand) / len(cand)) def _track_match_threshold_from_pose(pose_arr: Optional[List[float]]) -> float: if isinstance(pose_arr, list): pose = _reshape_keypoints_2d(pose_arr) s = _estimate_torso_scale(pose) if s is not None: return max(float(TRACK_MATCH_MIN_PX), float(TRACK_MATCH_FACTOR) * float(s)) return float(max(TRACK_MATCH_MIN_PX, 120.0)) def _build_tracks_over_video(frames_data: List[Any]) -> List[_Track]: tracks: List[_Track] = [] for t, frame in enumerate(frames_data): if not isinstance(frame, dict): continue people = frame.get("people", []) if not isinstance(people, list) or not people: continue cand: List[Tuple[int, Dict[str, Any], Tuple[float, float]]] = [] for i, p in enumerate(people): if not isinstance(p, dict): continue pose = p.get("pose_keypoints_2d") c = _body_center_from_pose(pose) if c is None: continue cand.append((i, p, c)) if not cand: continue used = set() track_order = sorted(range(len(tracks)), key=lambda k: tracks[k].last_t, reverse=True) for k in track_order: tr = tracks[k] age = t - tr.last_t if age > int(TRACK_MAX_FRAME_GAP): continue best_idx = None best_d = 1e18 for i, p, cc in cand: if i in used: continue thr = _track_match_threshold_from_pose(p.get("pose_keypoints_2d")) d = _dist(tr.last_center, cc) if d <= thr and d < best_d: best_d = d best_idx = i if best_idx is not None: i, p, cc = next(x for x in cand if x[0] == best_idx) used.add(i) tr.frames[t] = p tr.centers[t] = cc tr.last_t = t tr.last_center = cc for i, p, cc in cand: if i in used: continue tracks.append(_Track(frames={t: p}, centers={t: cc}, last_t=t, last_center=cc)) return tracks def _track_presence_score(tr: _Track) -> Tuple[int, float, float]: frames_count = len(tr.frames) face_sum = 0.0 body_sum = 0.0 for p in tr.frames.values(): face_sum += _sum_conf(p.get("face_keypoints_2d"), sample_step=4) body_sum += _sum_conf(p.get("pose_keypoints_2d"), sample_step=1) return (frames_count, face_sum, body_sum) def _pick_main_track(tracks: List[_Track]) -> Optional[_Track]: if not tracks: return None best = None best_key = (-1, -1e18, -1e18) for tr in tracks: key = _track_presence_score(tr) if key > best_key: best_key = key best = tr return best @dataclass class BodyState: last_xy: List[Optional[Tuple[float, float]]] last_v: List[Tuple[float, float]] def __init__(self, joints: int): self.last_xy = [None] * joints self.last_v = [(0.0, 0.0)] * joints def _smooth_body_pose(pose_arr: Optional[List[float]], state: BodyState) -> Optional[List[float]]: if pose_arr is None: return None kps = _reshape_keypoints_2d(pose_arr) J = len(kps) if len(state.last_xy) != J: state.last_xy = [None] * J state.last_v = [(0.0, 0.0)] * J out: List[Tuple[float, float, float]] = [] for j in range(J): x, y, c = kps[j] last = state.last_xy[j] vx_last, vy_last = state.last_v[j] valid_in = _is_valid_xyc(x, y, c) and (c >= CONF_GATE_BODY) if valid_in: if last is None: nx, ny = x, y state.last_xy[j] = (nx, ny) state.last_v[j] = (0.0, 0.0) out.append((nx, ny, float(c))) continue dx_raw = x - last[0] dy_raw = y - last[1] if abs(dx_raw) < EPS: dx_raw = 0.0 if abs(dy_raw) < EPS: dy_raw = 0.0 vx = VEL_ALPHA * dx_raw + (1.0 - VEL_ALPHA) * vx_last vy = VEL_ALPHA * dy_raw + (1.0 - VEL_ALPHA) * vy_last px = last[0] + vx py = last[1] + vy nx = ALPHA_BODY * x + (1.0 - ALPHA_BODY) * px ny = ALPHA_BODY * y + (1.0 - ALPHA_BODY) * py ddx = nx - last[0] ddy = ny - last[1] d = math.hypot(ddx, ddy) if d > MAX_STEP_BODY and d > 1e-6: scale = MAX_STEP_BODY / d nx = last[0] + ddx * scale ny = last[1] + ddy * scale vx = nx - last[0] vy = ny - last[1] state.last_xy[j] = (nx, ny) state.last_v[j] = (vx, vy) out.append((nx, ny, float(c))) else: out.append((float(x), float(y), float(c))) return _flatten_keypoints_2d(out) COCO18_EDGES = [ (1, 2), (2, 3), (3, 4), (1, 5), (5, 6), (6, 7), (1, 8), (8, 9), (9, 10), (1, 11), (11, 12), (12, 13), (8, 11), (1, 0), (0, 14), (14, 16), (0, 15), (15, 17), ] HAND21_EDGES = [ (0, 1), (1, 2), (2, 3), (3, 4), (0, 5), (5, 6), (6, 7), (7, 8), (0, 9), (9, 10), (10, 11), (11, 12), (0, 13), (13, 14), (14, 15), (15, 16), (0, 17), (17, 18), (18, 19), (19, 20), ] _NEIGHBORS = None def _build_neighbors(): global _NEIGHBORS if _NEIGHBORS is not None: return neigh = {} for a, b in COCO18_EDGES: neigh.setdefault(a, set()).add(b) neigh.setdefault(b, set()).add(a) _NEIGHBORS = neigh def _suppress_spatial_outliers_in_pose_arr( pose_arr: Optional[List[float]], *, conf_gate: float ) -> Optional[List[float]]: if not isinstance(pose_arr, list) or len(pose_arr) % 3 != 0: return pose_arr pose = _reshape_keypoints_2d(pose_arr) J = len(pose) center = _body_center_from_pose(pose_arr) scale = _estimate_torso_scale(pose) if center is None or scale is None: return pose_arr cx, cy = center max_r = TORSO_RADIUS_FACTOR * scale max_bone = BONE_MAX_FACTOR * scale out = [list(p) for p in pose] def visible(j: int) -> bool: if j >= J: return False x, y, c = out[j] return (c >= conf_gate) and not (x == 0 and y == 0) for j in range(J): x, y, c = out[j] if c >= conf_gate and not (x == 0 and y == 0): if math.hypot(x - cx, y - cy) > max_r: out[j] = [0.0, 0.0, 0.0] for a, b in COCO18_EDGES: if a >= J or b >= J: continue if not visible(a) or not visible(b): continue ax, ay, ac = out[a] bx, by, bc = out[b] d = math.hypot(ax - bx, ay - by) if d > max_bone: if ac <= bc: out[a] = [0.0, 0.0, 0.0] else: out[b] = [0.0, 0.0, 0.0] flat: List[float] = [] for x, y, c in out: flat.extend([float(x), float(y), float(c)]) return flat def _suppress_isolated_joints_in_pose_arr( pose_arr: Optional[List[float]], *, conf_gate: float, keep: set[int] = None ) -> Optional[List[float]]: if not isinstance(pose_arr, list) or len(pose_arr) % 3 != 0: return pose_arr _build_neighbors() pose = _reshape_keypoints_2d(pose_arr) J = len(pose) out = [list(p) for p in pose] if keep is None: keep = set() def vis(j: int) -> bool: if j >= J: return False x, y, c = out[j] return (c >= conf_gate) and not (x == 0 and y == 0) for j in range(J): if j in keep: continue if not vis(j): continue neighs = _NEIGHBORS.get(j, set()) if not any((n < J and vis(n)) for n in neighs): out[j] = [0.0, 0.0, 0.0] flat = [] for x, y, c in out: flat.extend([float(x), float(y), float(c)]) return flat def _denoise_and_fill_gaps_pose_seq( pose_arr_seq: List[Optional[List[float]]], *, conf_gate: float, min_run: int, max_gap: int, ) -> List[Optional[List[float]]]: if not pose_arr_seq: return pose_arr_seq J = None for arr in pose_arr_seq: if isinstance(arr, list) and len(arr) % 3 == 0 and len(arr) > 0: J = len(arr) // 3 break if J is None: return pose_arr_seq T = len(pose_arr_seq) out_seq: List[Optional[List[float]]] = [] for arr in pose_arr_seq: if isinstance(arr, list) and len(arr) == J * 3: out_seq.append(list(arr)) else: out_seq.append(arr) def is_vis(arr: List[float], j: int) -> bool: x = float(arr[3 * j + 0]) y = float(arr[3 * j + 1]) c = float(arr[3 * j + 2]) return (c >= conf_gate) and not (x == 0 and y == 0) # 1) remove short flashes for j in range(J): start = None for t in range(T + 1): cur = False if t < T and isinstance(out_seq[t], list): cur = is_vis(out_seq[t], j) if cur and start is None: start = t if (not cur) and start is not None: run_len = t - start if run_len < min_run: for k in range(start, t): if not isinstance(out_seq[k], list): continue out_seq[k][3 * j + 0] = 0.0 out_seq[k][3 * j + 1] = 0.0 out_seq[k][3 * j + 2] = 0.0 start = None # 2) gap fill only if returns for j in range(J): last_vis_t = None t = 0 while t < T: arr = out_seq[t] if not isinstance(arr, list): t += 1 continue cur_vis = is_vis(arr, j) if cur_vis: last_vis_t = t t += 1 continue if last_vis_t is None: t += 1 continue gap_start = t t2 = t while t2 < T: arr2 = out_seq[t2] if isinstance(arr2, list) and is_vis(arr2, j): break t2 += 1 if t2 >= T: break gap_len = t2 - gap_start if gap_len <= 0: t = t2 continue if gap_len <= max_gap: a = out_seq[last_vis_t] b = out_seq[t2] if isinstance(a, list) and isinstance(b, list): ax, ay, ac = float(a[3 * j + 0]), float(a[3 * j + 1]), float(a[3 * j + 2]) bx, by, bc = float(b[3 * j + 0]), float(b[3 * j + 1]), float(b[3 * j + 2]) if not (ax == 0 and ay == 0) and not (bx == 0 and by == 0): conf_fill = min(ac, bc) for k in range(gap_len): tt = gap_start + k if not isinstance(out_seq[tt], list): continue r = (k + 1) / (gap_len + 1) x = ax + (bx - ax) * r y = ay + (by - ay) * r out_seq[tt][3 * j + 0] = float(x) out_seq[tt][3 * j + 1] = float(y) out_seq[tt][3 * j + 2] = float(conf_fill) t = t2 return out_seq def _zero_lag_ema_pose_seq( pose_seq: List[Optional[List[float]]], *, alpha: float, conf_gate: float ) -> List[Optional[List[float]]]: if not pose_seq: return pose_seq J = None for arr in pose_seq: if isinstance(arr, list) and len(arr) % 3 == 0 and len(arr) > 0: J = len(arr) // 3 break if J is None: return pose_seq T = len(pose_seq) def is_vis(arr: List[float], j: int) -> bool: x = float(arr[3 * j + 0]) y = float(arr[3 * j + 1]) c = float(arr[3 * j + 2]) return (c >= conf_gate) and not (x == 0 and y == 0) fwd = [None] * T last = [None] * J for t in range(T): arr = pose_seq[t] if not isinstance(arr, list) or len(arr) != J * 3: fwd[t] = arr continue out = list(arr) for j in range(J): if is_vis(arr, j): x = float(arr[3 * j + 0]) y = float(arr[3 * j + 1]) if last[j] is None: sx, sy = x, y else: sx = alpha * x + (1 - alpha) * last[j][0] sy = alpha * y + (1 - alpha) * last[j][1] last[j] = (sx, sy) out[3 * j + 0] = float(sx) out[3 * j + 1] = float(sy) fwd[t] = out bwd = [None] * T last = [None] * J for t in range(T - 1, -1, -1): arr = fwd[t] if not isinstance(arr, list) or len(arr) != J * 3: bwd[t] = arr continue out = list(arr) for j in range(J): if is_vis(arr, j): x = float(arr[3 * j + 0]) y = float(arr[3 * j + 1]) if last[j] is None: sx, sy = x, y else: sx = alpha * x + (1 - alpha) * last[j][0] sy = alpha * y + (1 - alpha) * last[j][1] last[j] = (sx, sy) out[3 * j + 0] = float(sx) out[3 * j + 1] = float(sy) bwd[t] = out return bwd def _apply_root_scale( pose_arr: Optional[List[float]], *, src_root: Tuple[float, float], src_scale: float, dst_root: Tuple[float, float], dst_scale: float, ) -> Optional[List[float]]: if not isinstance(pose_arr, list) or len(pose_arr) % 3 != 0: return pose_arr if src_scale <= 1e-6 or dst_scale <= 1e-6: return pose_arr kps = _reshape_keypoints_2d(pose_arr) out = [] s = dst_scale / src_scale for x, y, c in kps: if c <= 0 or (x == 0 and y == 0): out.append((x, y, c)) continue nx = dst_root[0] + (x - src_root[0]) * s ny = dst_root[1] + (y - src_root[1]) * s out.append((nx, ny, c)) return _flatten_keypoints_2d(out) def _carry_pose_when_torso_missing( pose_seq: List[Optional[List[float]]], *, conf_gate: float, max_carry: int, anchor_joints: List[int], min_anchors: int, ) -> List[Optional[List[float]]]: if not pose_seq: return pose_seq J = None for arr in pose_seq: if isinstance(arr, list) and len(arr) % 3 == 0 and len(arr) > 0: J = len(arr) // 3 break if J is None: return pose_seq out = [a if a is None else list(a) for a in pose_seq] FILL_JOINTS = {1, 8, 9, 10, 11, 12, 13} FILL_JOINTS -= set(ALLOW_DISAPPEAR_JOINTS) def is_vis_flat(arr: List[float], j: int) -> bool: x = float(arr[3 * j + 0]) y = float(arr[3 * j + 1]) c = float(arr[3 * j + 2]) return (c >= conf_gate) and not (x == 0 and y == 0) def count_visible(arr: List[float], joints: List[int]) -> int: c = 0 for j in joints: if j < J and is_vis_flat(arr, j): c += 1 return c def root_scale_from_anchors(arr: List[float]) -> Optional[Tuple[Tuple[float, float], float]]: pts = [] for j in anchor_joints: if j >= J: continue if is_vis_flat(arr, j): x = float(arr[3 * j + 0]) y = float(arr[3 * j + 1]) pts.append((x, y)) if len(pts) < min_anchors: return None rx = sum(p[0] for p in pts) / len(pts) ry = sum(p[1] for p in pts) / len(pts) xs = [p[0] for p in pts] ys = [p[1] for p in pts] scale = max(max(xs) - min(xs), max(ys) - min(ys)) if scale <= 1e-3: return None return (rx, ry), float(scale) last_good: Optional[List[float]] = None last_good_rs: Optional[Tuple[Tuple[float, float], float]] = None carry_left = 0 for t in range(len(out)): arr = out[t] if not isinstance(arr, list) or len(arr) != J * 3: continue anchors_ok = count_visible(arr, anchor_joints) >= min_anchors fill_vis = sum(1 for j in FILL_JOINTS if j < J and is_vis_flat(arr, j)) rs = root_scale_from_anchors(arr) if anchors_ok and rs is not None and fill_vis >= 2: last_good = list(arr) last_good_rs = rs carry_left = max_carry continue if anchors_ok and rs is not None and last_good is not None and last_good_rs is not None and carry_left > 0: dst_root, dst_scale = rs src_root, src_scale = last_good_rs carried_full = _apply_root_scale( last_good, src_root=src_root, src_scale=src_scale, dst_root=dst_root, dst_scale=dst_scale, ) if isinstance(carried_full, list) and len(carried_full) == J * 3: for j in FILL_JOINTS: if j >= J: continue if is_vis_flat(arr, j): continue cx = float(carried_full[3 * j + 0]) cy = float(carried_full[3 * j + 1]) cc = float(carried_full[3 * j + 2]) if (cx == 0 and cy == 0) or cc <= 0: continue arr[3 * j + 0] = cx arr[3 * j + 1] = cy arr[3 * j + 2] = max(min(cc, 0.60), conf_gate) out[t] = arr carry_left -= 1 continue carry_left = max(carry_left - 1, 0) return out def _force_full_torso_pair( pose_seq: List[Optional[List[float]]], *, conf_gate: float, anchor_joints: List[int], min_anchors: int, max_lookback: int = 240, fill_legs_with_hip: bool = True, always_fill_if_one_hip: bool = True, ) -> List[Optional[List[float]]]: if not pose_seq: return pose_seq J = None for arr in pose_seq: if isinstance(arr, list) and len(arr) % 3 == 0 and len(arr) > 0: J = len(arr) // 3 break if J is None: return pose_seq out = [a if a is None else list(a) for a in pose_seq] R_HIP, R_KNEE, R_ANK = 8, 9, 10 L_HIP, L_KNEE, L_ANK = 11, 12, 13 def is_vis(arr: List[float], j: int) -> bool: if j >= J: return False x = float(arr[3 * j + 0]) y = float(arr[3 * j + 1]) c = float(arr[3 * j + 2]) return (c >= conf_gate) and not (x == 0 and y == 0) def count_visible(arr: List[float], joints: List[int]) -> int: c = 0 for j in joints: if is_vis(arr, j): c += 1 return c def root_scale_from_anchors(arr: List[float]) -> Optional[Tuple[Tuple[float, float], float]]: pts = [] for j in anchor_joints: if j >= J: continue if is_vis(arr, j): pts.append((float(arr[3 * j + 0]), float(arr[3 * j + 1]))) if len(pts) < min_anchors: return None rx = sum(p[0] for p in pts) / len(pts) ry = sum(p[1] for p in pts) / len(pts) xs = [p[0] for p in pts] ys = [p[1] for p in pts] scale = max(max(xs) - min(xs), max(ys) - min(ys)) if scale <= 1e-3: return None return (rx, ry), float(scale) last_full_idx = None last_full = None last_full_rs = None for t in range(len(out)): arr = out[t] if not isinstance(arr, list) or len(arr) != J * 3: continue rs = root_scale_from_anchors(arr) r_ok = is_vis(arr, R_HIP) l_ok = is_vis(arr, L_HIP) anchors_ok = count_visible(arr, anchor_joints) >= min_anchors if anchors_ok and rs is not None and r_ok and l_ok: last_full_idx = t last_full = list(arr) last_full_rs = rs continue if last_full is None or last_full_rs is None or last_full_idx is None: continue if (t - last_full_idx) > max_lookback: continue if not (r_ok or l_ok): continue if r_ok and l_ok: continue if not always_fill_if_one_hip: continue if rs is None: continue dst_root, dst_scale = rs src_root, src_scale = last_full_rs carried = _apply_root_scale( last_full, src_root=src_root, src_scale=src_scale, dst_root=dst_root, dst_scale=dst_scale, ) if not (isinstance(carried, list) and len(carried) == J * 3): continue def copy_joint(j: int): if j >= J: return if is_vis(arr, j): return cx = float(carried[3 * j + 0]) cy = float(carried[3 * j + 1]) cc = float(carried[3 * j + 2]) if (cx == 0 and cy == 0) or cc <= 0: return arr[3 * j + 0] = cx arr[3 * j + 1] = cy arr[3 * j + 2] = max(min(cc, 0.60), conf_gate) if not r_ok: copy_joint(R_HIP) if fill_legs_with_hip: copy_joint(R_KNEE) copy_joint(R_ANK) if not l_ok: copy_joint(L_HIP) if fill_legs_with_hip: copy_joint(L_KNEE) copy_joint(L_ANK) out[t] = arr return out def _median3_pose_seq(pose_seq: List[Optional[List[float]]], *, conf_gate: float) -> List[Optional[List[float]]]: if not pose_seq: return pose_seq J = None for arr in pose_seq: if isinstance(arr, list) and len(arr) % 3 == 0 and len(arr) > 0: J = len(arr) // 3 break if J is None: return pose_seq T = len(pose_seq) def is_vis(arr: List[float], j: int) -> bool: x = float(arr[3 * j + 0]) y = float(arr[3 * j + 1]) c = float(arr[3 * j + 2]) return (c >= conf_gate) and not (x == 0 and y == 0) out_seq: List[Optional[List[float]]] = [] for t in range(T): arr = pose_seq[t] if not isinstance(arr, list) or len(arr) != J * 3: out_seq.append(arr) continue out = list(arr) t0 = max(0, t - 1) t1 = t t2 = min(T - 1, t + 1) a0 = pose_seq[t0] a1 = pose_seq[t1] a2 = pose_seq[t2] for j in range(J): if not is_vis(arr, j): continue xs, ys = [], [] for aa in (a0, a1, a2): if isinstance(aa, list) and len(aa) == J * 3 and is_vis(aa, j): xs.append(float(aa[3 * j + 0])) ys.append(float(aa[3 * j + 1])) if len(xs) >= 2: xs.sort() ys.sort() out[3 * j + 0] = float(xs[len(xs) // 2]) out[3 * j + 1] = float(ys[len(ys) // 2]) out_seq.append(out) return out_seq def _sync_group_appearances( pose_arr_seq: List[Optional[List[float]]], *, group: set[int], conf_gate: float, lookahead: int, ) -> List[Optional[List[float]]]: if not pose_arr_seq: return pose_arr_seq J = None for arr in pose_arr_seq: if isinstance(arr, list) and len(arr) % 3 == 0 and len(arr) > 0: J = len(arr) // 3 break if J is None: return pose_arr_seq T = len(pose_arr_seq) out_seq: List[Optional[List[float]]] = [] for arr in pose_arr_seq: if isinstance(arr, list) and len(arr) == J * 3: out_seq.append(list(arr)) else: out_seq.append(arr) def is_vis(arr: List[float], j: int) -> bool: x = float(arr[3 * j + 0]) y = float(arr[3 * j + 1]) c = float(arr[3 * j + 2]) return (c >= conf_gate) and not (x == 0 and y == 0) for t in range(T): arr = out_seq[t] if not isinstance(arr, list): continue vis = {j for j in group if j < J and is_vis(arr, j)} if not vis: continue missing = {j for j in group if j < J and j not in vis} if not missing: continue appear_t: dict[int, int] = {} for j in list(missing): t2 = t + 1 while t2 < T and t2 <= t + lookahead: arr2 = out_seq[t2] if isinstance(arr2, list) and is_vis(arr2, j): appear_t[j] = t2 break t2 += 1 if not appear_t: continue for j, t2 in appear_t.items(): last_t = None for tb in range(t - 1, -1, -1): arrb = out_seq[tb] if isinstance(arrb, list) and is_vis(arrb, j): last_t = tb break if last_t is None: b = out_seq[t2] if not isinstance(b, list): continue bx, by, bc = float(b[3 * j + 0]), float(b[3 * j + 1]), float(b[3 * j + 2]) for k in range(t, t2): a = out_seq[k] if not isinstance(a, list): continue a[3 * j + 0] = bx a[3 * j + 1] = by a[3 * j + 2] = bc continue a0 = out_seq[last_t] b0 = out_seq[t2] if not (isinstance(a0, list) and isinstance(b0, list)): continue ax, ay, ac = float(a0[3 * j + 0]), float(a0[3 * j + 1]), float(a0[3 * j + 2]) bx, by, bc = float(b0[3 * j + 0]), float(b0[3 * j + 1]), float(b0[3 * j + 2]) if (ax == 0 and ay == 0) or (bx == 0 and by == 0): continue conf_fill = min(ac, bc) total = t2 - last_t if total <= 0: continue for tt in range(t, t2): a = out_seq[tt] if not isinstance(a, list): continue r = (tt - last_t) / total x = ax + (bx - ax) * r y = ay + (by - ay) * r a[3 * j + 0] = float(x) a[3 * j + 1] = float(y) a[3 * j + 2] = float(conf_fill) return out_seq def _count_valid_points(arr: Optional[List[float]], *, conf_gate: float) -> int: if not isinstance(arr, list) or len(arr) % 3 != 0: return 0 cnt = 0 for i in range(0, len(arr), 3): x, y, c = float(arr[i]), float(arr[i + 1]), float(arr[i + 2]) if c >= conf_gate and not (x == 0 and y == 0): cnt += 1 return cnt def _zero_out_kps(arr: Optional[List[float]]) -> Optional[List[float]]: if not isinstance(arr, list) or len(arr) % 3 != 0: return arr out = list(arr) for i in range(0, len(out), 3): out[i + 0] = 0.0 out[i + 1] = 0.0 out[i + 2] = 0.0 return out def _pin_body_wrist_to_hand( p_out: Dict[str, Any], *, side: str, conf_gate_body: float = 0.2, conf_gate_hand: float = 0.2, blend: float = 1.0, ) -> None: if side == "right": bw = 4 hk = "hand_right_keypoints_2d" else: bw = 7 hk = "hand_left_keypoints_2d" pose = p_out.get("pose_keypoints_2d") hand = p_out.get(hk) if not (isinstance(pose, list) and isinstance(hand, list)): return if len(pose) < (bw * 3 + 3): return if len(hand) < 3: return hx, hy, hc = float(hand[0]), float(hand[1]), float(hand[2]) if hc < conf_gate_hand or (hx == 0.0 and hy == 0.0): return bx, by, bc = float(pose[bw * 3 + 0]), float(pose[bw * 3 + 1]), float(pose[bw * 3 + 2]) if bc < conf_gate_body or (bx == 0.0 and by == 0.0): pose[bw * 3 + 0] = hx pose[bw * 3 + 1] = hy pose[bw * 3 + 2] = float(max(bc, min(hc, 0.9))) else: nx = bx * (1.0 - blend) + hx * blend ny = by * (1.0 - blend) + hy * blend pose[bw * 3 + 0] = nx pose[bw * 3 + 1] = ny pose[bw * 3 + 2] = float(min(bc, hc)) p_out["pose_keypoints_2d"] = pose def _fix_elbow_using_wrist(p_out: Dict[str, Any], *, side: str, conf_gate: float = 0.2) -> None: pose = p_out.get("pose_keypoints_2d") if not isinstance(pose, list) or len(pose) % 3 != 0: return if side == "right": sh, el, wr = 2, 3, 4 else: sh, el, wr = 5, 6, 7 def get(j): return float(pose[3 * j + 0]), float(pose[3 * j + 1]), float(pose[3 * j + 2]) def vis(x, y, c): return c >= conf_gate and not (x == 0.0 and y == 0.0) sx, sy, sc = get(sh) ex, ey, ec = get(el) wx, wy, wc = get(wr) if not (vis(sx, sy, sc) and vis(wx, wy, wc)): return if vis(ex, ey, ec): Lse = math.hypot(ex - sx, ey - sy) Lew = math.hypot(wx - ex, wy - ey) else: dsw = math.hypot(wx - sx, wy - sy) if dsw < 1e-3: return Lse = 0.55 * dsw Lew = 0.45 * dsw dx = wx - sx dy = wy - sy d = math.hypot(dx, dy) if d < 1e-6: return d2 = max(min(d, (Lse + Lew) - 1e-3), abs(Lse - Lew) + 1e-3) a = (Lse * Lse - Lew * Lew + d2 * d2) / (2.0 * d2) h2 = max(Lse * Lse - a * a, 0.0) h = math.sqrt(h2) ux = dx / d uy = dy / d px = sx + a * ux py = sy + a * uy rx = -uy ry = ux e1x, e1y = px + h * rx, py + h * ry e2x, e2y = px - h * rx, py - h * ry if vis(ex, ey, ec): if math.hypot(e1x - ex, e1y - ey) <= math.hypot(e2x - ex, e2y - ey): nx, ny = e1x, e1y else: nx, ny = e2x, e2y else: nx, ny = e1x, e1y pose[3 * el + 0] = float(nx) pose[3 * el + 1] = float(ny) pose[3 * el + 2] = float(max(min(ec, 0.8), conf_gate)) p_out["pose_keypoints_2d"] = pose def _remove_short_presence_runs_kps_seq( seq: List[Optional[List[float]]], *, conf_gate: float, min_points_present: int, min_run: int, ) -> List[Optional[List[float]]]: if not seq: return seq present = [(_count_valid_points(a, conf_gate=conf_gate) >= min_points_present) for a in seq] out = [None if a is None else list(a) for a in seq] start = None for t in range(len(seq) + 1): cur = present[t] if t < len(seq) else False if cur and start is None: start = t if (not cur) and start is not None: run_len = t - start if run_len < min_run: for k in range(start, t): out[k] = _zero_out_kps(out[k]) start = None return out def _zero_sparse_frames_kps_seq( seq: List[Optional[List[float]]], *, conf_gate: float, min_points_present: int ) -> List[Optional[List[float]]]: if not seq: return seq out: List[Optional[List[float]]] = [] for a in seq: if not isinstance(a, list): out.append(a) continue if _count_valid_points(a, conf_gate=conf_gate) < min_points_present: out.append(_zero_out_kps(a)) else: out.append(a) return out def _suppress_spatial_outliers_in_hand_arr( hand_arr: Optional[List[float]], *, conf_gate: float, max_bone_factor: float = 3.0 ) -> Optional[List[float]]: if not isinstance(hand_arr, list) or len(hand_arr) % 3 != 0: return hand_arr pts = _reshape_keypoints_2d(hand_arr) J = len(pts) if J < 21: return hand_arr out = [list(p) for p in pts] def vis(j: int) -> bool: x, y, c = out[j] return c >= conf_gate and not (x == 0 and y == 0) vv = [(x, y) for (x, y, c) in out if c >= conf_gate and not (x == 0 and y == 0)] if len(vv) < 6: return hand_arr xs = [p[0] for p in vv] ys = [p[1] for p in vv] scale = max(max(xs) - min(xs), max(ys) - min(ys)) if scale <= 1e-3: return hand_arr max_bone = max_bone_factor * scale for a, b in HAND21_EDGES: if a >= J or b >= J: continue if not vis(a) or not vis(b): continue ax, ay, ac = out[a] bx, by, bc = out[b] d = math.hypot(ax - bx, ay - by) if d > max_bone: if ac <= bc: out[a] = [0.0, 0.0, 0.0] else: out[b] = [0.0, 0.0, 0.0] return _flatten_keypoints_2d([(x, y, c) for x, y, c in out]) def _body_head_root_scale_from_pose( pose_arr: Optional[List[float]], *, conf_gate: float ) -> Optional[Tuple[Tuple[float, float], float]]: if not isinstance(pose_arr, list) or len(pose_arr) % 3 != 0: return None kps = _reshape_keypoints_2d(pose_arr) def vis(j: int) -> Optional[Tuple[float, float]]: if j >= len(kps): return None x, y, c = kps[j] if c >= conf_gate and not (x == 0 and y == 0): return (float(x), float(y)) return None pts = [] for j in [0, 1, 14, 15, 16, 17]: p = vis(j) if p is not None: pts.append(p) if not pts: return None rx = sum(p[0] for p in pts) / len(pts) ry = sum(p[1] for p in pts) / len(pts) root = (rx, ry) def dist(a: int, b: int) -> Optional[float]: pa, pb = vis(a), vis(b) if pa is None or pb is None: return None d = math.hypot(pa[0] - pb[0], pa[1] - pb[1]) return d if d > 1e-3 else None cands = [dist(14, 15), dist(16, 17), dist(2, 5)] cands = [c for c in cands if c is not None] if not cands: return None scale = float(sum(cands) / len(cands)) return root, scale def _body_wrist_root_scale_from_pose( pose_arr: Optional[List[float]], *, side: str, conf_gate: float ) -> Optional[Tuple[Tuple[float, float], float]]: if not isinstance(pose_arr, list) or len(pose_arr) % 3 != 0: return None kps = _reshape_keypoints_2d(pose_arr) if side == "right": w, e = 4, 3 else: w, e = 7, 6 def vis(j: int) -> Optional[Tuple[float, float]]: if j >= len(kps): return None x, y, c = kps[j] if c >= conf_gate and not (x == 0 and y == 0): return (float(x), float(y)) return None pw = vis(w) if pw is None: return None root = pw pe = vis(e) scale = None if pe is not None: d = math.hypot(pw[0] - pe[0], pw[1] - pe[1]) if d > 1e-3: scale = d if scale is None: p2 = vis(2) p5 = vis(5) if p2 is not None and p5 is not None: d = math.hypot(p2[0] - p5[0], p2[1] - p5[1]) if d > 1e-3: scale = d if scale is None: return None return root, float(scale) def _smooth_dense_seq_anchored_to_body( dense_seq: List[Optional[List[float]]], body_pose_seq: List[Optional[List[float]]], *, kind: str, conf_gate_dense: float, conf_gate_body: float, median3: bool, zero_lag_alpha: float, ) -> List[Optional[List[float]]]: if not dense_seq: return dense_seq Jd = None for a in dense_seq: if isinstance(a, list) and len(a) % 3 == 0 and len(a) > 0: Jd = len(a) // 3 break if Jd is None: return dense_seq T = len(dense_seq) out = [None if a is None else list(a) for a in dense_seq] norm_seq: List[Optional[List[float]]] = [None] * T for t in range(T): arr = out[t] body = body_pose_seq[t] if t < len(body_pose_seq) else None if not isinstance(arr, list) or len(arr) != Jd * 3 or not isinstance(body, list): norm_seq[t] = arr continue if kind == "face": rs = _body_head_root_scale_from_pose(body, conf_gate=conf_gate_body) elif kind == "hand_left": rs = _body_wrist_root_scale_from_pose(body, side="left", conf_gate=conf_gate_body) else: rs = _body_wrist_root_scale_from_pose(body, side="right", conf_gate=conf_gate_body) if rs is None: norm_seq[t] = arr continue (rx, ry), s = rs if s <= 1e-6: norm_seq[t] = arr continue nn = list(arr) for j in range(Jd): x = float(arr[3 * j + 0]) y = float(arr[3 * j + 1]) c = float(arr[3 * j + 2]) if c >= conf_gate_dense and not (x == 0 and y == 0): nn[3 * j + 0] = (x - rx) / s nn[3 * j + 1] = (y - ry) / s norm_seq[t] = nn if median3: norm_seq = _median3_pose_seq(norm_seq, conf_gate=conf_gate_dense) norm_seq = _zero_lag_ema_pose_seq(norm_seq, alpha=zero_lag_alpha, conf_gate=conf_gate_dense) for t in range(T): arrn = norm_seq[t] body = body_pose_seq[t] if t < len(body_pose_seq) else None if not isinstance(arrn, list) or len(arrn) != Jd * 3 or not isinstance(body, list): continue if kind == "face": rs = _body_head_root_scale_from_pose(body, conf_gate=conf_gate_body) elif kind == "hand_left": rs = _body_wrist_root_scale_from_pose(body, side="left", conf_gate=conf_gate_body) else: rs = _body_wrist_root_scale_from_pose(body, side="right", conf_gate=conf_gate_body) if rs is None: continue (rx, ry), s = rs if s <= 1e-6: continue orig = out[t] for j in range(Jd): x = float(arrn[3 * j + 0]) y = float(arrn[3 * j + 1]) c = float(arrn[3 * j + 2]) ox = float(orig[3 * j + 0]) oy = float(orig[3 * j + 1]) oc = float(orig[3 * j + 2]) if oc >= conf_gate_dense and not (ox == 0 and oy == 0) and c >= conf_gate_dense: orig[3 * j + 0] = rx + x * s orig[3 * j + 1] = ry + y * s out[t] = orig return out def smooth_KPS_json_obj( data: Any, *, keep_face_untouched: bool = True, keep_hands_untouched: bool = True, filter_extra_people: Optional[bool] = None, ) -> Any: if not isinstance(data, list): raise ValueError("Expected top-level JSON to be a list of frames.") if filter_extra_people is None: filter_extra_people = bool(FILTER_EXTRA_PEOPLE) chosen_people: List[Optional[Dict[str, Any]]] = [None] * len(data) if MAIN_PERSON_MODE == "longest_track": tracks = _build_tracks_over_video(data) main_tr = _pick_main_track(tracks) if main_tr is not None: for t in range(len(data)): if t in main_tr.frames: chosen_people[t] = main_tr.frames[t] else: prev_center: Optional[Tuple[float, float]] = None for i, frame in enumerate(data): if not isinstance(frame, dict): continue people = frame.get("people", []) if not isinstance(people, list) or len(people) == 0: continue chosen = _choose_single_person(people, prev_center) chosen_people[i] = chosen if chosen is not None: c = _body_center_from_pose(chosen.get("pose_keypoints_2d")) if c is not None: prev_center = c else: prev_center: Optional[Tuple[float, float]] = None for i, frame in enumerate(data): if not isinstance(frame, dict): continue people = frame.get("people", []) if not isinstance(people, list) or len(people) == 0: continue chosen = _choose_single_person(people, prev_center) chosen_people[i] = chosen if chosen is not None: c = _body_center_from_pose(chosen.get("pose_keypoints_2d")) if c is not None: prev_center = c pose_seq: List[Optional[List[float]]] = [] for p in chosen_people: pose_seq.append(p.get("pose_keypoints_2d") if isinstance(p, dict) else None) if SPATIAL_OUTLIER_FIX: pose_seq = [ _suppress_spatial_outliers_in_pose_arr(arr, conf_gate=CONF_GATE_BODY) if arr is not None else None for arr in pose_seq ] if GAP_FILL_ENABLED: pose_seq = _denoise_and_fill_gaps_pose_seq( pose_seq, conf_gate=CONF_GATE_BODY, min_run=MIN_RUN_FRAMES, max_gap=MAX_GAP_FRAMES, ) if TORSO_SYNC_ENABLED: pose_seq = _sync_group_appearances( pose_seq, group=TORSO_JOINTS, conf_gate=CONF_GATE_BODY, lookahead=TORSO_LOOKAHEAD_FRAMES, ) pose_seq = [ ( _suppress_isolated_joints_in_pose_arr(arr, conf_gate=CONF_GATE_BODY, keep=TORSO_JOINTS) if arr is not None else None ) for arr in pose_seq ] if MEDIAN3_ENABLED: pose_seq = _median3_pose_seq(pose_seq, conf_gate=CONF_GATE_BODY) if SUPER_SMOOTH_ENABLED: pose_seq = _zero_lag_ema_pose_seq(pose_seq, alpha=SUPER_SMOOTH_ALPHA, conf_gate=SUPER_SMOOTH_MIN_CONF) if ROOTSCALE_CARRY_ENABLED: pose_seq = _carry_pose_when_torso_missing( pose_seq, conf_gate=CARRY_CONF_GATE, max_carry=CARRY_MAX_FRAMES, anchor_joints=CARRY_ANCHOR_JOINTS, min_anchors=CARRY_MIN_ANCHORS, ) pose_seq = _force_full_torso_pair( pose_seq, conf_gate=CARRY_CONF_GATE, anchor_joints=CARRY_ANCHOR_JOINTS, min_anchors=CARRY_MIN_ANCHORS, max_lookback=240, fill_legs_with_hip=True, always_fill_if_one_hip=True, ) face_seq: List[Optional[List[float]]] = [] lh_seq: List[Optional[List[float]]] = [] rh_seq: List[Optional[List[float]]] = [] for p in chosen_people: if isinstance(p, dict): face_seq.append(p.get("face_keypoints_2d")) lh_seq.append(p.get("hand_left_keypoints_2d")) rh_seq.append(p.get("hand_right_keypoints_2d")) else: face_seq.append(None) lh_seq.append(None) rh_seq.append(None) if HANDS_SMOOTH_ENABLED and (not keep_hands_untouched): lh_seq = [ _suppress_spatial_outliers_in_hand_arr(a, conf_gate=CONF_GATE_HAND) if a is not None else None for a in lh_seq ] rh_seq = [ _suppress_spatial_outliers_in_hand_arr(a, conf_gate=CONF_GATE_HAND) if a is not None else None for a in rh_seq ] lh_seq = _remove_short_presence_runs_kps_seq( lh_seq, conf_gate=CONF_GATE_HAND, min_points_present=HAND_MIN_POINTS_PRESENT, min_run=MIN_HAND_RUN_FRAMES ) rh_seq = _remove_short_presence_runs_kps_seq( rh_seq, conf_gate=CONF_GATE_HAND, min_points_present=HAND_MIN_POINTS_PRESENT, min_run=MIN_HAND_RUN_FRAMES ) lh_seq = _zero_sparse_frames_kps_seq( lh_seq, conf_gate=CONF_GATE_HAND, min_points_present=HAND_MIN_POINTS_PRESENT ) rh_seq = _zero_sparse_frames_kps_seq( rh_seq, conf_gate=CONF_GATE_HAND, min_points_present=HAND_MIN_POINTS_PRESENT ) if DENSE_GAP_FILL_ENABLED: lh_seq = _denoise_and_fill_gaps_pose_seq( lh_seq, conf_gate=CONF_GATE_HAND, min_run=DENSE_MIN_RUN_FRAMES, max_gap=DENSE_MAX_GAP_FRAMES ) rh_seq = _denoise_and_fill_gaps_pose_seq( rh_seq, conf_gate=CONF_GATE_HAND, min_run=DENSE_MIN_RUN_FRAMES, max_gap=DENSE_MAX_GAP_FRAMES ) if FACE_SMOOTH_ENABLED and (not keep_face_untouched): if DENSE_GAP_FILL_ENABLED: face_seq = _denoise_and_fill_gaps_pose_seq( face_seq, conf_gate=CONF_GATE_FACE, min_run=DENSE_MIN_RUN_FRAMES, max_gap=DENSE_MAX_GAP_FRAMES ) if FACE_SMOOTH_ENABLED and (not keep_face_untouched): face_seq = _smooth_dense_seq_anchored_to_body( face_seq, pose_seq, kind="face", conf_gate_dense=CONF_GATE_FACE, conf_gate_body=CONF_GATE_BODY, median3=DENSE_MEDIAN3_ENABLED, zero_lag_alpha=DENSE_SUPER_SMOOTH_ALPHA, ) if HANDS_SMOOTH_ENABLED and (not keep_hands_untouched): lh_seq = _smooth_dense_seq_anchored_to_body( lh_seq, pose_seq, kind="hand_left", conf_gate_dense=CONF_GATE_HAND, conf_gate_body=CONF_GATE_BODY, median3=DENSE_MEDIAN3_ENABLED, zero_lag_alpha=DENSE_SUPER_SMOOTH_ALPHA, ) rh_seq = _smooth_dense_seq_anchored_to_body( rh_seq, pose_seq, kind="hand_right", conf_gate_dense=CONF_GATE_HAND, conf_gate_body=CONF_GATE_BODY, median3=DENSE_MEDIAN3_ENABLED, zero_lag_alpha=DENSE_SUPER_SMOOTH_ALPHA, ) out_frames = [] body_state: Optional[BodyState] = None for i, frame in enumerate(data): if not isinstance(frame, dict): out_frames.append(frame) continue frame_out = copy.deepcopy(frame) chosen = chosen_people[i] if chosen is None: if filter_extra_people: frame_out["people"] = [] out_frames.append(frame_out) continue p_out = copy.deepcopy(chosen) p_out["pose_keypoints_2d"] = pose_seq[i] pose_arr = p_out.get("pose_keypoints_2d") joints = (len(pose_arr) // 3) if isinstance(pose_arr, list) else 0 if body_state is None: body_state = BodyState(joints if joints > 0 else 18) p_out["pose_keypoints_2d"] = _smooth_body_pose(p_out.get("pose_keypoints_2d"), body_state) if FACE_SMOOTH_ENABLED and (not keep_face_untouched): p_out["face_keypoints_2d"] = face_seq[i] else: p_out["face_keypoints_2d"] = chosen.get("face_keypoints_2d", p_out.get("face_keypoints_2d")) if HANDS_SMOOTH_ENABLED and (not keep_hands_untouched): p_out["hand_left_keypoints_2d"] = lh_seq[i] p_out["hand_right_keypoints_2d"] = rh_seq[i] else: p_out["hand_left_keypoints_2d"] = chosen.get("hand_left_keypoints_2d", p_out.get("hand_left_keypoints_2d")) p_out["hand_right_keypoints_2d"] = chosen.get( "hand_right_keypoints_2d", p_out.get("hand_right_keypoints_2d") ) _pin_body_wrist_to_hand( p_out, side="left", conf_gate_body=CONF_GATE_BODY, conf_gate_hand=CONF_GATE_HAND, blend=1.0 ) _pin_body_wrist_to_hand( p_out, side="right", conf_gate_body=CONF_GATE_BODY, conf_gate_hand=CONF_GATE_HAND, blend=1.0 ) _fix_elbow_using_wrist(p_out, side="left", conf_gate=CONF_GATE_BODY) _fix_elbow_using_wrist(p_out, side="right", conf_gate=CONF_GATE_BODY) if filter_extra_people: frame_out["people"] = [p_out] else: orig_people = frame.get("people", []) if not isinstance(orig_people, list): frame_out["people"] = [p_out] else: replaced = False new_people = [] for op in orig_people: if (not replaced) and (op is chosen): new_people.append(p_out) replaced = True else: new_people.append(copy.deepcopy(op)) if not replaced: new_people = [p_out] + [copy.deepcopy(op) for op in orig_people] frame_out["people"] = new_people out_frames.append(frame_out) return out_frames # ============================================================ # === END: smooth_KPS_json.py logic # ============================================================ # ============================================================ # === START: render_pose_video.py logic (ported to frame render) # ============================================================ OP_COLORS: List[Tuple[int, int, int]] = [ (255, 0, 0), (255, 85, 0), (255, 170, 0), (255, 255, 0), (170, 255, 0), (85, 255, 0), (0, 255, 0), (0, 255, 85), (0, 255, 170), (0, 255, 255), (0, 170, 255), (0, 85, 255), (0, 0, 255), (85, 0, 255), (170, 0, 255), (255, 0, 255), (255, 0, 170), (255, 0, 85), ] BODY_EDGES: List[Tuple[int, int]] = [ (1, 2), (1, 5), (2, 3), (3, 4), (5, 6), (6, 7), (1, 8), (8, 9), (9, 10), (1, 11), (11, 12), (12, 13), (1, 0), (0, 14), (14, 16), (0, 15), (15, 17), ] BODY_EDGE_COLORS = OP_COLORS[: len(BODY_EDGES)] BODY_JOINT_COLORS = OP_COLORS HAND_EDGES: List[Tuple[int, int]] = [ (0, 1), (1, 2), (2, 3), (3, 4), (0, 5), (5, 6), (6, 7), (7, 8), (0, 9), (9, 10), (10, 11), (11, 12), (0, 13), (13, 14), (14, 15), (15, 16), (0, 17), (17, 18), (18, 19), (19, 20), ] def _valid_pt(x: float, y: float, c: float, conf_thresh: float) -> bool: return (c is not None) and (c >= conf_thresh) and not (x == 0 and y == 0) def _hsv_to_bgr(h: float, s: float, v: float) -> Tuple[int, int, int]: H = int(np.clip(h, 0.0, 1.0) * 179.0) S = int(np.clip(s, 0.0, 1.0) * 255.0) V = int(np.clip(v, 0.0, 1.0) * 255.0) hsv = np.uint8([[[H, S, V]]]) bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)[0, 0] return int(bgr[0]), int(bgr[1]), int(bgr[2]) def _looks_normalized(points: List[Tuple[float, float, float]], conf_thresh: float) -> bool: valid = [(x, y, c) for (x, y, c) in points if _valid_pt(x, y, c, conf_thresh)] if not valid: return False in01 = sum(1 for (x, y, _) in valid if 0.0 <= x <= 1.0 and 0.0 <= y <= 1.0) return (in01 / float(len(valid))) >= 0.7 def _draw_body( canvas: np.ndarray, pose: List[Tuple[float, float, float]], conf_thresh: float, xinsr_stick_scaling: bool = False ) -> None: CH, CW = canvas.shape[:2] stickwidth = 2 valid = [(x, y, c) for (x, y, c) in pose if _valid_pt(x, y, c, conf_thresh)] norm = False if valid: in01 = sum(1 for (x, y, _) in valid if 0.0 <= x <= 1.0 and 0.0 <= y <= 1.0) norm = (in01 / float(len(valid))) >= 0.7 def to_px(x: float, y: float) -> Tuple[float, float]: if norm: return x * CW, y * CH return x, y max_side = max(CW, CH) if xinsr_stick_scaling: stick_scale = 1 if max_side < 500 else min(2 + (max_side // 1000), 7) else: stick_scale = 1 for idx, (a, b) in enumerate(BODY_EDGES): if a >= len(pose) or b >= len(pose): continue ax, ay, ac = pose[a] bx, by, bc = pose[b] if not (_valid_pt(ax, ay, ac, conf_thresh) and _valid_pt(bx, by, bc, conf_thresh)): continue ax, ay = to_px(ax, ay) bx, by = to_px(bx, by) base = BODY_EDGE_COLORS[idx] if idx < len(BODY_EDGE_COLORS) else (255, 255, 255) X = np.array([ay, by], dtype=np.float32) Y = np.array([ax, bx], dtype=np.float32) mX = float(np.mean(X)) mY = float(np.mean(Y)) length = float(np.hypot(X[0] - X[1], Y[0] - Y[1])) if length < 1.0: continue angle = math.degrees(math.atan2(X[0] - X[1], Y[0] - Y[1])) polygon = cv2.ellipse2Poly( (int(mY), int(mX)), (int(length / 2), int(stickwidth * stick_scale)), int(angle), 0, 360, 1, ) cv2.fillConvexPoly( canvas, polygon, (int(base[0] * 0.6), int(base[1] * 0.6), int(base[2] * 0.6)), ) for j, (x, y, c) in enumerate(pose): if not _valid_pt(x, y, c, conf_thresh): continue x, y = to_px(x, y) col = BODY_JOINT_COLORS[j] if j < len(BODY_JOINT_COLORS) else (255, 255, 255) cv2.circle(canvas, (int(x), int(y)), 2, col, thickness=-1) def _draw_hand(canvas: np.ndarray, hand: List[Tuple[float, float, float]], conf_thresh: float) -> None: if not hand or len(hand) < 21: return CH, CW = canvas.shape[:2] norm = _looks_normalized(hand, conf_thresh) def to_px(x: float, y: float) -> Tuple[float, float]: return (x * CW, y * CH) if norm else (x, y) n_edges = len(HAND_EDGES) for i, (a, b) in enumerate(HAND_EDGES): x1, y1, c1 = hand[a] x2, y2, c2 = hand[b] if _valid_pt(x1, y1, c1, conf_thresh) and _valid_pt(x2, y2, c2, conf_thresh): x1, y1 = to_px(x1, y1) x2, y2 = to_px(x2, y2) bgr = _hsv_to_bgr(i / float(n_edges), 1.0, 1.0) cv2.line(canvas, (int(x1), int(y1)), (int(x2), int(y2)), bgr, 1, cv2.LINE_AA) for x, y, c in hand: if _valid_pt(x, y, c, conf_thresh): x, y = to_px(x, y) cv2.circle(canvas, (int(x), int(y)), 1, (0, 0, 255), -1, cv2.LINE_AA) def _draw_face(canvas: np.ndarray, face: List[Tuple[float, float, float]], conf_thresh: float) -> None: if not face: return CH, CW = canvas.shape[:2] norm = _looks_normalized(face, conf_thresh) def to_px(x: float, y: float) -> Tuple[float, float]: return (x * CW, y * CH) if norm else (x, y) for x, y, c in face: if _valid_pt(x, y, c, conf_thresh): x, y = to_px(x, y) cv2.circle(canvas, (int(x), int(y)), 0, (255, 255, 255), -1, cv2.LINE_AA) def _draw_pose_frame_full( w: int, h: int, person: Dict[str, Any], conf_thresh_body: float = 0.10, conf_thresh_hands: float = 0.10, conf_thresh_face: float = 0.10, ) -> np.ndarray: img = np.zeros((h, w, 3), dtype=np.uint8) pose = _reshape_keypoints_2d(person.get("pose_keypoints_2d") or []) face = _reshape_keypoints_2d(person.get("face_keypoints_2d") or []) hand_l = _reshape_keypoints_2d(person.get("hand_left_keypoints_2d") or []) hand_r = _reshape_keypoints_2d(person.get("hand_right_keypoints_2d") or []) if pose: _draw_body(img, pose, conf_thresh_body) if hand_l: _draw_hand(img, hand_l, conf_thresh_hands) if hand_r: _draw_hand(img, hand_r, conf_thresh_hands) if face: _draw_face(img, face, conf_thresh_face) return img # ============================================================ # === END: render_pose_video.py logic # ============================================================ # ============================================================ # ComfyUI mappings # ============================================================ NODE_CLASS_MAPPINGS = { "TSPoseDataSmoother": KPSSmoothPoseDataAndRender, } NODE_DISPLAY_NAME_MAPPINGS = { "TSPoseDataSmoother": "KPS: Smooth + Render (pose_data/PKL)", }