File size: 4,805 Bytes
1a01fdb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import gradio as gr
import cv2
import numpy as np
from gradio_webrtc import WebRTC
from twilio.rest import Client
import os
import spaces
from threading import Lock
from collections import defaultdict
import time
from bisect import bisect_left
from scipy.spatial.distance import cdist
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe.framework.formats import landmark_pb2
from mediapipe import solutions
import pdb
import json
from moviepy.editor import VideoFileClip
import librosa

# 启用 GPU 选项
base_options = mp.tasks.BaseOptions(model_asset_path='gesture_recognizer.task',
    delegate=mp.tasks.BaseOptions.Delegate.GPU)

mp_drawing = mp.solutions.drawing_utils
mp_hands = mp.solutions.hands

base_options=mp.tasks.BaseOptions(model_asset_path='hand_landmarker.task',
    delegate=mp.tasks.BaseOptions.Delegate.GPU)
options = vision.HandLandmarkerOptions(base_options=base_options,
    running_mode=mp.tasks.vision.RunningMode.VIDEO,
    num_hands=2)
detector = vision.HandLandmarker.create_from_options(options)

options_image = vision.HandLandmarkerOptions(base_options=base_options,
    running_mode=mp.tasks.vision.RunningMode.IMAGE,
    num_hands=2)
detector_image = vision.HandLandmarker.create_from_options(options_image)

video_size = (500, 500)

previous_timestamp = None

class ReferenceVideo:
    def __init__(self):
        self.keypoints = {"Left": [], "Right": []}
        # self.timestamps = []
        # self.duration = 0
        self.frames = []       # 存储原始视频帧
    
    def load_video(self, video_path):
        global previous_timestamp
        self.keypoints = {"Left": [], "Right": []}
        self.frames = []

        video = VideoFileClip(video_path)
        fps = video.fps  # 需要存储 fps
        video_size = (video.size[0], video.size[1])
        audio = video.audio.to_soundarray()
        original_sr = video.audio.fps
        audio = librosa.resample(audio.T, orig_sr=original_sr, target_sr=48000).T  # 采样率转换

        if audio.ndim == 2 and audio.shape[1] == 2:
            audio = 0.5 * (audio[:, 0] + audio[:, 1])  # 立体声转单声道
        audio = audio.astype(np.float32)

        cap = cv2.VideoCapture(video_path)
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            
            # 1. 显式拷贝并创建独立的图像数据
            rgb_data = frame.astype(np.uint8).copy()
            rgb = mp.Image(image_format=mp.ImageFormat.SRGB,data=np.array(cv2.cvtColor(frame,cv2.COLOR_BGR2RGB)))
            
            # 2. 生成严格递增的时间戳(单位:毫秒)
            timestamp_ms = int(cap.get(cv2.CAP_PROP_POS_MSEC))
            while previous_timestamp is not None and timestamp_ms <= previous_timestamp:
                timestamp_ms = previous_timestamp + 1
            previous_timestamp = timestamp_ms
            
            # 3. 调用检测器
            results = detector.detect_for_video(rgb, timestamp_ms)
            
            # 4. 处理检测结果
            frame_landmarks = {"Left": None, "Right": None}
            if results.hand_landmarks and results.handedness:
                for idx, hand_landmarks in enumerate(results.hand_landmarks):
                    label = results.handedness[idx][0].category_name
                    landmarks = [(lm.x, lm.y, lm.z) for lm in hand_landmarks]
                    frame_landmarks[label] = landmarks
            
            self.keypoints["Left"].append(frame_landmarks["Left"])
            self.keypoints["Right"].append(frame_landmarks["Right"])
            # self.timestamps.append(timestamp_ms / 1000)  # 统一使用计算后的时间戳
            self.frames.append(cv2.resize(frame, video_size))
            output_path = os.path.splitext(video_path)[0] + "_keypoints.json"
            with open(output_path, "w") as f:
                json.dump(self.keypoints, f)
            
            # 5. 显式释放资源
            del results  # 关键:释放检测结果占用的GPU资源
            del rgb     # 释放Image实例
        
        # self.duration = self.timestamps[-1] if self.timestamps else 0
        cap.release()
        video.close()

        # np.save(f"{os.path.splitext(video_path)[0]}_frames.npy", np.array(self.frames, dtype=np.uint8))
        # np.save(f"{os.path.splitext(video_path)[0]}_audio.npy", audio)
        metadata = {"fps": fps, "video_size": video_size}
        with open(f"{os.path.splitext(video_path)[0]}_meta.json", "w") as f:
            json.dump(metadata, f)

ref_video = ReferenceVideo()

video_paths = ['predefined/Move12_preview.mp4', 'predefined/Move12_main.mp4']

for video_path in video_paths:
    ref_video.load_video(video_path)