finger / keypoints_process.py
huzey's picture
init
1a01fdb
import gradio as gr
import cv2
import numpy as np
from gradio_webrtc import WebRTC
from twilio.rest import Client
import os
import spaces
from threading import Lock
from collections import defaultdict
import time
from bisect import bisect_left
from scipy.spatial.distance import cdist
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe.framework.formats import landmark_pb2
from mediapipe import solutions
import pdb
import json
from moviepy.editor import VideoFileClip
import librosa
# 启用 GPU 选项
base_options = mp.tasks.BaseOptions(model_asset_path='gesture_recognizer.task',
delegate=mp.tasks.BaseOptions.Delegate.GPU)
mp_drawing = mp.solutions.drawing_utils
mp_hands = mp.solutions.hands
base_options=mp.tasks.BaseOptions(model_asset_path='hand_landmarker.task',
delegate=mp.tasks.BaseOptions.Delegate.GPU)
options = vision.HandLandmarkerOptions(base_options=base_options,
running_mode=mp.tasks.vision.RunningMode.VIDEO,
num_hands=2)
detector = vision.HandLandmarker.create_from_options(options)
options_image = vision.HandLandmarkerOptions(base_options=base_options,
running_mode=mp.tasks.vision.RunningMode.IMAGE,
num_hands=2)
detector_image = vision.HandLandmarker.create_from_options(options_image)
video_size = (500, 500)
previous_timestamp = None
class ReferenceVideo:
def __init__(self):
self.keypoints = {"Left": [], "Right": []}
# self.timestamps = []
# self.duration = 0
self.frames = [] # 存储原始视频帧
def load_video(self, video_path):
global previous_timestamp
self.keypoints = {"Left": [], "Right": []}
self.frames = []
video = VideoFileClip(video_path)
fps = video.fps # 需要存储 fps
video_size = (video.size[0], video.size[1])
audio = video.audio.to_soundarray()
original_sr = video.audio.fps
audio = librosa.resample(audio.T, orig_sr=original_sr, target_sr=48000).T # 采样率转换
if audio.ndim == 2 and audio.shape[1] == 2:
audio = 0.5 * (audio[:, 0] + audio[:, 1]) # 立体声转单声道
audio = audio.astype(np.float32)
cap = cv2.VideoCapture(video_path)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# 1. 显式拷贝并创建独立的图像数据
rgb_data = frame.astype(np.uint8).copy()
rgb = mp.Image(image_format=mp.ImageFormat.SRGB,data=np.array(cv2.cvtColor(frame,cv2.COLOR_BGR2RGB)))
# 2. 生成严格递增的时间戳(单位:毫秒)
timestamp_ms = int(cap.get(cv2.CAP_PROP_POS_MSEC))
while previous_timestamp is not None and timestamp_ms <= previous_timestamp:
timestamp_ms = previous_timestamp + 1
previous_timestamp = timestamp_ms
# 3. 调用检测器
results = detector.detect_for_video(rgb, timestamp_ms)
# 4. 处理检测结果
frame_landmarks = {"Left": None, "Right": None}
if results.hand_landmarks and results.handedness:
for idx, hand_landmarks in enumerate(results.hand_landmarks):
label = results.handedness[idx][0].category_name
landmarks = [(lm.x, lm.y, lm.z) for lm in hand_landmarks]
frame_landmarks[label] = landmarks
self.keypoints["Left"].append(frame_landmarks["Left"])
self.keypoints["Right"].append(frame_landmarks["Right"])
# self.timestamps.append(timestamp_ms / 1000) # 统一使用计算后的时间戳
self.frames.append(cv2.resize(frame, video_size))
output_path = os.path.splitext(video_path)[0] + "_keypoints.json"
with open(output_path, "w") as f:
json.dump(self.keypoints, f)
# 5. 显式释放资源
del results # 关键:释放检测结果占用的GPU资源
del rgb # 释放Image实例
# self.duration = self.timestamps[-1] if self.timestamps else 0
cap.release()
video.close()
# np.save(f"{os.path.splitext(video_path)[0]}_frames.npy", np.array(self.frames, dtype=np.uint8))
# np.save(f"{os.path.splitext(video_path)[0]}_audio.npy", audio)
metadata = {"fps": fps, "video_size": video_size}
with open(f"{os.path.splitext(video_path)[0]}_meta.json", "w") as f:
json.dump(metadata, f)
ref_video = ReferenceVideo()
video_paths = ['predefined/Move12_preview.mp4', 'predefined/Move12_main.mp4']
for video_path in video_paths:
ref_video.load_video(video_path)