import gradio as gr import cv2 import numpy as np from gradio_webrtc import WebRTC from twilio.rest import Client import os import spaces from threading import Lock from collections import defaultdict import time from bisect import bisect_left from scipy.spatial.distance import cdist import mediapipe as mp from mediapipe.tasks import python from mediapipe.tasks.python import vision from mediapipe.framework.formats import landmark_pb2 from mediapipe import solutions import pdb import json from moviepy.editor import VideoFileClip import librosa # 启用 GPU 选项 base_options = mp.tasks.BaseOptions(model_asset_path='gesture_recognizer.task', delegate=mp.tasks.BaseOptions.Delegate.GPU) mp_drawing = mp.solutions.drawing_utils mp_hands = mp.solutions.hands base_options=mp.tasks.BaseOptions(model_asset_path='hand_landmarker.task', delegate=mp.tasks.BaseOptions.Delegate.GPU) options = vision.HandLandmarkerOptions(base_options=base_options, running_mode=mp.tasks.vision.RunningMode.VIDEO, num_hands=2) detector = vision.HandLandmarker.create_from_options(options) options_image = vision.HandLandmarkerOptions(base_options=base_options, running_mode=mp.tasks.vision.RunningMode.IMAGE, num_hands=2) detector_image = vision.HandLandmarker.create_from_options(options_image) video_size = (500, 500) previous_timestamp = None class ReferenceVideo: def __init__(self): self.keypoints = {"Left": [], "Right": []} # self.timestamps = [] # self.duration = 0 self.frames = [] # 存储原始视频帧 def load_video(self, video_path): global previous_timestamp self.keypoints = {"Left": [], "Right": []} self.frames = [] video = VideoFileClip(video_path) fps = video.fps # 需要存储 fps video_size = (video.size[0], video.size[1]) audio = video.audio.to_soundarray() original_sr = video.audio.fps audio = librosa.resample(audio.T, orig_sr=original_sr, target_sr=48000).T # 采样率转换 if audio.ndim == 2 and audio.shape[1] == 2: audio = 0.5 * (audio[:, 0] + audio[:, 1]) # 立体声转单声道 audio = audio.astype(np.float32) cap = cv2.VideoCapture(video_path) while cap.isOpened(): ret, frame = cap.read() if not ret: break # 1. 显式拷贝并创建独立的图像数据 rgb_data = frame.astype(np.uint8).copy() rgb = mp.Image(image_format=mp.ImageFormat.SRGB,data=np.array(cv2.cvtColor(frame,cv2.COLOR_BGR2RGB))) # 2. 生成严格递增的时间戳(单位:毫秒) timestamp_ms = int(cap.get(cv2.CAP_PROP_POS_MSEC)) while previous_timestamp is not None and timestamp_ms <= previous_timestamp: timestamp_ms = previous_timestamp + 1 previous_timestamp = timestamp_ms # 3. 调用检测器 results = detector.detect_for_video(rgb, timestamp_ms) # 4. 处理检测结果 frame_landmarks = {"Left": None, "Right": None} if results.hand_landmarks and results.handedness: for idx, hand_landmarks in enumerate(results.hand_landmarks): label = results.handedness[idx][0].category_name landmarks = [(lm.x, lm.y, lm.z) for lm in hand_landmarks] frame_landmarks[label] = landmarks self.keypoints["Left"].append(frame_landmarks["Left"]) self.keypoints["Right"].append(frame_landmarks["Right"]) # self.timestamps.append(timestamp_ms / 1000) # 统一使用计算后的时间戳 self.frames.append(cv2.resize(frame, video_size)) output_path = os.path.splitext(video_path)[0] + "_keypoints.json" with open(output_path, "w") as f: json.dump(self.keypoints, f) # 5. 显式释放资源 del results # 关键:释放检测结果占用的GPU资源 del rgb # 释放Image实例 # self.duration = self.timestamps[-1] if self.timestamps else 0 cap.release() video.close() # np.save(f"{os.path.splitext(video_path)[0]}_frames.npy", np.array(self.frames, dtype=np.uint8)) # np.save(f"{os.path.splitext(video_path)[0]}_audio.npy", audio) metadata = {"fps": fps, "video_size": video_size} with open(f"{os.path.splitext(video_path)[0]}_meta.json", "w") as f: json.dump(metadata, f) ref_video = ReferenceVideo() video_paths = ['predefined/Move12_preview.mp4', 'predefined/Move12_main.mp4'] for video_path in video_paths: ref_video.load_video(video_path)