|
|
import json |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import matplotlib.pyplot as plt |
|
|
import seaborn as sns |
|
|
from dataclasses import dataclass |
|
|
from typing import List, Dict, Tuple, Optional |
|
|
from datetime import datetime |
|
|
import os |
|
|
import cv2 |
|
|
import base64 |
|
|
from scipy.signal import savgol_filter |
|
|
import logging |
|
|
import io |
|
|
import scipy.ndimage as ndimage |
|
|
import shutil |
|
|
import re |
|
|
import subprocess |
|
|
import tempfile |
|
|
try: |
|
|
import ffmpeg |
|
|
except ImportError: |
|
|
print("ffmpeg-python not installed. Using subprocess for video processing.") |
|
|
|
|
|
@dataclass |
|
|
class FootprintArea: |
|
|
"""足印区域数据类""" |
|
|
area_id: str |
|
|
paw_type: str |
|
|
start_frame: int |
|
|
end_frame: int |
|
|
position: Dict[str, int] |
|
|
|
|
|
class GaitAnalysisReport: |
|
|
def __init__(self, json_path: str, pre_delay_ms: int = 0, post_delay_ms: int = 0, video_path: str = None, |
|
|
record_params: dict = None): |
|
|
"""初始化报告生成器 |
|
|
|
|
|
Args: |
|
|
json_path: 足印数据JSON文件路径 |
|
|
pre_delay_ms: 足印前的延时提取时间(毫秒) |
|
|
post_delay_ms: 足印后的延时提取时间(毫秒) |
|
|
video_path: 视频文件路径 |
|
|
record_params: 记录参数,包含实验记录相关信息 |
|
|
""" |
|
|
self.json_path = json_path |
|
|
self.pre_delay_ms = pre_delay_ms |
|
|
self.post_delay_ms = post_delay_ms |
|
|
self.video_path = video_path |
|
|
self.record_params = record_params or {} |
|
|
self.fps = 120 |
|
|
self.pre_delay_frames = int(pre_delay_ms / 1000 * self.fps) |
|
|
self.post_delay_frames = int(post_delay_ms / 1000 * self.fps) |
|
|
self.result_dir = self._create_result_dir() |
|
|
self.footprint_areas = [] |
|
|
|
|
|
|
|
|
self.colors = { |
|
|
'RF': '#2196F3', |
|
|
'RH': '#9C27B0', |
|
|
'LF': '#FFD700', |
|
|
'LH': '#4CAF50' |
|
|
} |
|
|
|
|
|
self._load_data() |
|
|
|
|
|
def _create_result_dir(self) -> str: |
|
|
"""创建结果目录""" |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
result_dir = f"results/analysis_{timestamp}" |
|
|
|
|
|
|
|
|
for subdir in ['data', 'plots']: |
|
|
os.makedirs(f"{result_dir}/{subdir}", exist_ok=True) |
|
|
|
|
|
return result_dir |
|
|
|
|
|
def _load_data(self): |
|
|
"""加载并预处理JSON数据""" |
|
|
with open(self.json_path, 'r') as f: |
|
|
data = json.load(f) |
|
|
|
|
|
|
|
|
area_prints = {} |
|
|
|
|
|
|
|
|
for frame in data['frames']: |
|
|
frame_id = frame['frameId'] |
|
|
for footprint in frame['footprints']: |
|
|
area_id = footprint['footprintAreaId'] |
|
|
if area_id not in area_prints: |
|
|
area_prints[area_id] = [] |
|
|
|
|
|
area_prints[area_id].append({ |
|
|
'frame_id': frame_id, |
|
|
'position': footprint['position'], |
|
|
'type': footprint['type'] |
|
|
}) |
|
|
|
|
|
|
|
|
for area_id, prints in area_prints.items(): |
|
|
|
|
|
x_coords = [] |
|
|
y_coords = [] |
|
|
frame_ids = [] |
|
|
paw_type = prints[0]['type'] |
|
|
|
|
|
for p in prints: |
|
|
x_coords.extend([ |
|
|
p['position']['x'], |
|
|
p['position']['x'] + p['position']['width'] |
|
|
]) |
|
|
y_coords.extend([ |
|
|
p['position']['y'], |
|
|
p['position']['y'] + p['position']['height'] |
|
|
]) |
|
|
frame_ids.append(p['frame_id']) |
|
|
|
|
|
|
|
|
x_min = min(x_coords) |
|
|
y_min = min(y_coords) |
|
|
width = max(x_coords) - x_min |
|
|
height = max(y_coords) - y_min |
|
|
|
|
|
|
|
|
self.footprint_areas.append( |
|
|
FootprintArea( |
|
|
area_id=area_id, |
|
|
paw_type=paw_type, |
|
|
start_frame=min(frame_ids), |
|
|
end_frame=max(frame_ids), |
|
|
position={ |
|
|
'x': x_min, |
|
|
'y': y_min, |
|
|
'width': width, |
|
|
'height': height |
|
|
} |
|
|
) |
|
|
) |
|
|
|
|
|
def analyze_stance_time(self): |
|
|
"""分析足印支撑时间并生成可视化""" |
|
|
print("\n开始分析足印支撑时间...") |
|
|
|
|
|
|
|
|
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} |
|
|
for area in self.footprint_areas: |
|
|
if area.paw_type in paw_groups: |
|
|
paw_groups[area.paw_type].append(area) |
|
|
|
|
|
for paw_type in paw_groups: |
|
|
paw_groups[paw_type].sort(key=lambda x: x.start_frame) |
|
|
|
|
|
|
|
|
stance_times = {paw_type: [] for paw_type in paw_groups} |
|
|
cycle_times = {paw_type: [] for paw_type in paw_groups} |
|
|
|
|
|
for paw_type, footprints in paw_groups.items(): |
|
|
for i, footprint in enumerate(footprints): |
|
|
|
|
|
stance_time = (footprint.end_frame - footprint.start_frame) / self.fps |
|
|
stance_times[paw_type].append(stance_time) |
|
|
|
|
|
|
|
|
if i < len(footprints) - 1: |
|
|
cycle_time = (footprints[i+1].start_frame - footprint.start_frame) / self.fps |
|
|
cycle_times[paw_type].append(cycle_time) |
|
|
|
|
|
|
|
|
stats = { |
|
|
'stance_time': { |
|
|
paw_type: { |
|
|
'mean': np.mean(times), |
|
|
'std': np.std(times), |
|
|
'min': np.min(times), |
|
|
'max': np.max(times) |
|
|
} for paw_type, times in stance_times.items() |
|
|
}, |
|
|
'cycle_time': { |
|
|
paw_type: { |
|
|
'mean': np.mean(times), |
|
|
'std': np.std(times), |
|
|
'min': np.min(times), |
|
|
'max': np.max(times) |
|
|
} for paw_type, times in cycle_times.items() if times |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
self._plot_stance_timeline(paw_groups) |
|
|
|
|
|
|
|
|
self._plot_stance_stats(stance_times) |
|
|
|
|
|
|
|
|
self._save_stance_stats(stats) |
|
|
|
|
|
print("足印支撑时间分析完成!") |
|
|
return stats |
|
|
|
|
|
def _get_time_range(self) -> Tuple[float, float]: |
|
|
"""获取所有足印的时间范围,并添加边距""" |
|
|
start_frames = [area.start_frame for area in self.footprint_areas] |
|
|
end_frames = [area.end_frame for area in self.footprint_areas] |
|
|
|
|
|
|
|
|
time_min = min(start_frames) / self.fps |
|
|
time_max = max(end_frames) / self.fps |
|
|
time_range = time_max - time_min |
|
|
padding = time_range * 0.1 |
|
|
|
|
|
return max(0, time_min - padding), time_max + padding |
|
|
|
|
|
def _plot_stance_timeline(self, paw_groups: Dict[str, List[FootprintArea]]): |
|
|
"""绘制支撑时序图""" |
|
|
plt.figure(figsize=(15, 6)) |
|
|
|
|
|
|
|
|
time_min, time_max = self._get_time_range() |
|
|
|
|
|
|
|
|
plt.xlim(time_min, time_max) |
|
|
|
|
|
|
|
|
y_positions = {'RF': 4, 'RH': 3, 'LF': 2, 'LH': 1} |
|
|
|
|
|
|
|
|
for paw_type, footprints in paw_groups.items(): |
|
|
y = y_positions[paw_type] |
|
|
for footprint in footprints: |
|
|
start_time = footprint.start_frame / self.fps |
|
|
end_time = footprint.end_frame / self.fps |
|
|
plt.hlines(y=y, xmin=start_time, xmax=end_time, |
|
|
color=self.colors[paw_type], linewidth=8, alpha=0.6) |
|
|
|
|
|
plt.yticks(list(y_positions.values()), list(y_positions.keys())) |
|
|
plt.xlabel('Time (seconds)') |
|
|
plt.title('Gait Stance Timeline') |
|
|
plt.grid(True, axis='x', alpha=0.3) |
|
|
|
|
|
|
|
|
plt.savefig(f'{self.result_dir}/plots/stance_timeline.png', dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
|
|
|
def _plot_stance_stats(self, stance_times: Dict[str, List[float]]): |
|
|
"""绘制支撑时间统计图""" |
|
|
plt.figure(figsize=(10, 6)) |
|
|
plt.style.use('seaborn-v0_8-whitegrid') |
|
|
|
|
|
paw_order = ['RF', 'RH', 'LF', 'LH'] |
|
|
|
|
|
box_plot = plt.boxplot([stance_times[paw] for paw in paw_order], |
|
|
tick_labels=paw_order, |
|
|
patch_artist=True, |
|
|
medianprops={'color': 'black'}, |
|
|
flierprops={'marker': 'o', |
|
|
'markerfacecolor': 'none', |
|
|
'markersize': 4}) |
|
|
|
|
|
|
|
|
for patch, paw in zip(box_plot['boxes'], paw_order): |
|
|
patch.set_facecolor(self.colors[paw]) |
|
|
patch.set_alpha(0.6) |
|
|
|
|
|
for flier in box_plot['fliers']: |
|
|
flier.set_color(self.colors[paw]) |
|
|
|
|
|
|
|
|
for i, paw in enumerate(paw_order, 1): |
|
|
plt.scatter([i] * len(stance_times[paw]), |
|
|
stance_times[paw], |
|
|
color=self.colors[paw], |
|
|
alpha=0.4, |
|
|
s=30) |
|
|
|
|
|
|
|
|
plt.title('Stance Time Distribution by Paw Type', pad=20) |
|
|
plt.xlabel('Paw Type', labelpad=10) |
|
|
plt.ylabel('Stance Time (s)', labelpad=10) |
|
|
|
|
|
|
|
|
plt.grid(True, axis='y', linestyle='--', alpha=0.3) |
|
|
|
|
|
|
|
|
plt.tight_layout() |
|
|
|
|
|
|
|
|
plt.savefig(f'{self.result_dir}/plots/stance_stats.png', |
|
|
dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
|
|
|
def _save_stance_stats(self, stats: Dict): |
|
|
"""保存统计数据到CSV""" |
|
|
|
|
|
stance_df = pd.DataFrame.from_dict( |
|
|
{(i, j): stats['stance_time'][i][j] |
|
|
for i in stats['stance_time'].keys() |
|
|
for j in stats['stance_time'][i].keys()}, |
|
|
orient='index' |
|
|
) |
|
|
|
|
|
|
|
|
cycle_df = pd.DataFrame.from_dict( |
|
|
{(i, j): stats['cycle_time'][i][j] |
|
|
for i in stats['cycle_time'].keys() |
|
|
for j in stats['cycle_time'][i].keys()}, |
|
|
orient='index' |
|
|
) |
|
|
|
|
|
|
|
|
stance_df = stance_df.round(3) |
|
|
cycle_df = cycle_df.round(3) |
|
|
|
|
|
|
|
|
stance_df.to_csv(f'{self.result_dir}/data/stance_time_stats.csv') |
|
|
cycle_df.to_csv(f'{self.result_dir}/data/cycle_time_stats.csv') |
|
|
|
|
|
def analyze_gait_sequence(self): |
|
|
"""分析步态序列并绘制示意图""" |
|
|
print("\n开始分析步态序列...") |
|
|
|
|
|
|
|
|
GAIT_PATTERNS = { |
|
|
'LH-RF-RH-LF': 'Aa', |
|
|
'LH-LF-RH-RF': 'Ab', |
|
|
'LH-RF-LF-RH': 'Ca', |
|
|
'LH-RH-LF-RF': 'Cb', |
|
|
'LH-RH-RF-LF': 'Ra', |
|
|
'LH-LF-RF-RH': 'Rb' |
|
|
} |
|
|
|
|
|
|
|
|
all_footprints = [] |
|
|
for area in self.footprint_areas: |
|
|
all_footprints.append({ |
|
|
'paw_type': area.paw_type, |
|
|
'start_time': area.start_frame / self.fps, |
|
|
'color': self.colors[area.paw_type] |
|
|
}) |
|
|
|
|
|
|
|
|
all_footprints.sort(key=lambda x: x['start_time']) |
|
|
|
|
|
|
|
|
sequences = [] |
|
|
current_seq = [] |
|
|
sequence_labels = [] |
|
|
|
|
|
for fp in all_footprints: |
|
|
if not current_seq and fp['paw_type'] == 'LH': |
|
|
current_seq = [fp] |
|
|
elif current_seq: |
|
|
current_seq.append(fp) |
|
|
if len(current_seq) == 4: |
|
|
|
|
|
pattern = '-'.join([p['paw_type'] for p in current_seq]) |
|
|
if pattern in GAIT_PATTERNS: |
|
|
sequences.append(current_seq) |
|
|
sequence_labels.append(( |
|
|
current_seq[0]['start_time'], |
|
|
GAIT_PATTERNS[pattern] |
|
|
)) |
|
|
current_seq = [] |
|
|
|
|
|
|
|
|
plt.figure(figsize=(15, 4)) |
|
|
|
|
|
|
|
|
time_min, time_max = self._get_time_range() |
|
|
plt.xlim(time_min, time_max) |
|
|
|
|
|
|
|
|
y_positions = {'RF': 4, 'RH': 3, 'LF': 2, 'LH': 1} |
|
|
|
|
|
|
|
|
for i, footprint in enumerate(all_footprints): |
|
|
y = y_positions[footprint['paw_type']] |
|
|
x = footprint['start_time'] |
|
|
|
|
|
|
|
|
plt.scatter(x, y, c=footprint['color'], s=100, alpha=0.6, |
|
|
marker='o', label=footprint['paw_type'] if i < 4 else "") |
|
|
|
|
|
|
|
|
if i < len(all_footprints) - 1: |
|
|
next_print = all_footprints[i+1] |
|
|
plt.plot([x, next_print['start_time']], |
|
|
[y, y_positions[next_print['paw_type']]], |
|
|
'--', color='gray', alpha=0.3) |
|
|
|
|
|
|
|
|
for time, pattern in sequence_labels: |
|
|
plt.text(time, 0.5, pattern, rotation=45, ha='right', va='top', fontsize=8) |
|
|
|
|
|
plt.yticks(list(y_positions.values()), list(y_positions.keys())) |
|
|
plt.xlabel('Time (seconds)') |
|
|
plt.title('Gait Pattern') |
|
|
plt.grid(True, alpha=0.3) |
|
|
|
|
|
|
|
|
plt.savefig(f'{self.result_dir}/plots/gait_sequence.png', |
|
|
dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
|
|
|
print("步态序列分析完成!") |
|
|
return sequence_labels |
|
|
|
|
|
def analyze_pressure_timeline(self): |
|
|
"""分析并绘制足印压力时序图""" |
|
|
print("\n开始分析足印压力时序图...") |
|
|
|
|
|
if not self.video_path: |
|
|
raise ValueError("未指定视频文件路径") |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(self.video_path) |
|
|
if not cap.isOpened(): |
|
|
raise ValueError(f"无法打开视频文件: {self.video_path}") |
|
|
|
|
|
|
|
|
pressure_threshold = 5 |
|
|
|
|
|
|
|
|
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} |
|
|
for area in self.footprint_areas: |
|
|
if area.paw_type in paw_groups: |
|
|
paw_groups[area.paw_type].append({ |
|
|
'start_frame': max(0, area.start_frame - self.pre_delay_frames), |
|
|
'end_frame': area.end_frame + self.post_delay_frames, |
|
|
'original_start': area.start_frame, |
|
|
'original_end': area.end_frame, |
|
|
'position': area.position |
|
|
}) |
|
|
|
|
|
|
|
|
plt.figure(figsize=(15, 8)) |
|
|
|
|
|
for i, (paw_type, footprints) in enumerate(paw_groups.items()): |
|
|
plt.subplot(4, 1, i+1) |
|
|
|
|
|
for footprint in footprints: |
|
|
|
|
|
max_pressures = [0] |
|
|
avg_pressures = [0] |
|
|
times = [footprint['start_frame'] / self.fps] |
|
|
|
|
|
for frame_idx in range(footprint['start_frame'], footprint['end_frame']): |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
continue |
|
|
|
|
|
x = max(0, footprint['position']['x']) |
|
|
y = max(0, footprint['position']['y']) |
|
|
w = footprint['position']['width'] |
|
|
h = footprint['position']['height'] |
|
|
roi = frame[y:y+h, x:x+w] |
|
|
|
|
|
if roi.size > 0: |
|
|
|
|
|
g_values = roi[:, :, 1] |
|
|
max_pressures.append(float(np.max(g_values))) |
|
|
avg_pressures.append(float(np.mean(g_values))) |
|
|
times.append(frame_idx / self.fps) |
|
|
|
|
|
|
|
|
max_pressures.append(0) |
|
|
avg_pressures.append(0) |
|
|
times.append(footprint['end_frame'] / self.fps) |
|
|
|
|
|
|
|
|
if times: |
|
|
plt.plot(times, max_pressures, '--', color=self.colors[paw_type], |
|
|
alpha=0.8, label='Max Pressure' if i == 0 else '') |
|
|
plt.plot(times, avg_pressures, '-', color=self.colors[paw_type], |
|
|
alpha=0.5, label='Avg Pressure' if i == 0 else '') |
|
|
plt.fill_between(times, 0, avg_pressures, color=self.colors[paw_type], alpha=0.2) |
|
|
|
|
|
|
|
|
plt.legend(bbox_to_anchor=(1.05, 4.5), loc='upper left') |
|
|
|
|
|
|
|
|
plt.tight_layout() |
|
|
|
|
|
|
|
|
plt.savefig(f'{self.result_dir}/plots/pressure_timeline.png', |
|
|
dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
|
|
|
cap.release() |
|
|
print("Pressure timeline analysis completed!") |
|
|
|
|
|
def analyze_area_timeline(self): |
|
|
"""分析并绘制足印面积时序图""" |
|
|
print("\n开始分析足印面积时序图...") |
|
|
|
|
|
if not self.video_path: |
|
|
raise ValueError("未指定视频文件路径") |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(self.video_path) |
|
|
if not cap.isOpened(): |
|
|
raise ValueError(f"无法打开视频文件: {self.video_path}") |
|
|
|
|
|
|
|
|
pressure_threshold = 5 |
|
|
|
|
|
|
|
|
|
|
|
scale_factor = self.record_params.get('actual_length', 20) / self.record_params.get('scale_length', 152) * 10 |
|
|
|
|
|
|
|
|
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} |
|
|
for area in self.footprint_areas: |
|
|
if area.paw_type in paw_groups: |
|
|
paw_groups[area.paw_type].append({ |
|
|
'start_frame': max(0, area.start_frame - self.pre_delay_frames), |
|
|
'end_frame': area.end_frame + self.post_delay_frames, |
|
|
'original_start': area.start_frame, |
|
|
'original_end': area.end_frame, |
|
|
'position': area.position |
|
|
}) |
|
|
|
|
|
|
|
|
plt.figure(figsize=(15, 10)) |
|
|
|
|
|
|
|
|
for i, (paw_type, footprints) in enumerate(paw_groups.items()): |
|
|
ax = plt.subplot(4, 1, i+1) |
|
|
|
|
|
for footprint in footprints: |
|
|
|
|
|
areas = [0] |
|
|
times = [footprint['start_frame'] / self.fps] |
|
|
|
|
|
for frame_idx in range(footprint['start_frame'], footprint['end_frame']): |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
continue |
|
|
|
|
|
x = max(0, footprint['position']['x']) |
|
|
y = max(0, footprint['position']['y']) |
|
|
w = footprint['position']['width'] |
|
|
h = footprint['position']['height'] |
|
|
roi = frame[y:y+h, x:x+w] |
|
|
|
|
|
if roi.size > 0: |
|
|
|
|
|
g_values = roi[:, :, 1] |
|
|
|
|
|
area_pixels = np.sum(g_values > pressure_threshold) |
|
|
|
|
|
area_mm2 = area_pixels * (scale_factor ** 2) |
|
|
areas.append(float(area_mm2)) |
|
|
times.append(frame_idx / self.fps) |
|
|
|
|
|
|
|
|
areas.append(0) |
|
|
times.append(footprint['end_frame'] / self.fps) |
|
|
|
|
|
|
|
|
if times: |
|
|
plt.plot(times, areas, '-', color=self.colors[paw_type], |
|
|
alpha=0.7, label='Area' if i == 0 else '') |
|
|
plt.fill_between(times, 0, areas, color=self.colors[paw_type], alpha=0.2) |
|
|
|
|
|
|
|
|
plt.ylabel(f'{paw_type} Area (mm²)') |
|
|
plt.title(f'{paw_type} Footprint Area Timeline') |
|
|
plt.grid(True, alpha=0.3) |
|
|
|
|
|
|
|
|
if i < 3: |
|
|
plt.setp(ax.get_xticklabels(), visible=False) |
|
|
|
|
|
|
|
|
plt.legend(bbox_to_anchor=(1.05, 4.5), loc='upper left') |
|
|
|
|
|
|
|
|
plt.xlabel('Time (seconds)') |
|
|
|
|
|
|
|
|
plt.subplots_adjust(hspace=0.4) |
|
|
|
|
|
|
|
|
plt.savefig(f'{self.result_dir}/plots/area_timeline.png', |
|
|
dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
|
|
|
cap.release() |
|
|
print("足印面积时序图分析完成!") |
|
|
|
|
|
def analyze_swing_metrics(self): |
|
|
"""分析摆动相关指标""" |
|
|
print("\n开始分析摆动相关指标...") |
|
|
|
|
|
|
|
|
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} |
|
|
for area in self.footprint_areas: |
|
|
if area.paw_type in paw_groups: |
|
|
paw_groups[area.paw_type].append(area) |
|
|
|
|
|
|
|
|
swing_metrics = {paw_type: [] for paw_type in paw_groups} |
|
|
stride_durations = {paw_type: [] for paw_type in paw_groups} |
|
|
swing_percentages = {paw_type: [] for paw_type in paw_groups} |
|
|
|
|
|
for paw_type, footprints in paw_groups.items(): |
|
|
|
|
|
footprints.sort(key=lambda x: x.start_frame) |
|
|
|
|
|
for i in range(len(footprints) - 1): |
|
|
current_print = footprints[i] |
|
|
next_print = footprints[i + 1] |
|
|
|
|
|
|
|
|
swing_duration = (next_print.start_frame - current_print.end_frame) / self.fps |
|
|
swing_metrics[paw_type].append(swing_duration) |
|
|
|
|
|
|
|
|
stride_duration = (next_print.start_frame - current_print.start_frame) / self.fps |
|
|
stride_durations[paw_type].append(stride_duration) |
|
|
|
|
|
|
|
|
swing_percentage = (swing_duration / stride_duration) * 100 |
|
|
swing_percentages[paw_type].append(swing_percentage) |
|
|
|
|
|
|
|
|
self._plot_swing_stats(swing_metrics, "Swing Duration (s)", "swing_duration") |
|
|
|
|
|
|
|
|
self._plot_swing_stats(swing_percentages, "Swing Percentage (%)", "swing_percentage") |
|
|
|
|
|
|
|
|
stats = { |
|
|
'swing_duration': { |
|
|
paw_type: { |
|
|
'mean': np.mean(durations), |
|
|
'std': np.std(durations), |
|
|
'min': np.min(durations), |
|
|
'max': np.max(durations) |
|
|
} for paw_type, durations in swing_metrics.items() if durations |
|
|
}, |
|
|
'swing_percentage': { |
|
|
paw_type: { |
|
|
'mean': np.mean(percentages), |
|
|
'std': np.std(percentages), |
|
|
'min': np.min(percentages), |
|
|
'max': np.max(percentages) |
|
|
} for paw_type, percentages in swing_percentages.items() if percentages |
|
|
}, |
|
|
'stride_duration': { |
|
|
paw_type: { |
|
|
'mean': np.mean(durations), |
|
|
'std': np.std(durations), |
|
|
'min': np.min(durations), |
|
|
'max': np.max(durations) |
|
|
} for paw_type, durations in stride_durations.items() if durations |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
self._save_swing_stats(stats) |
|
|
|
|
|
print("摆动相关指标分析完成!") |
|
|
return stats |
|
|
|
|
|
def _plot_swing_stats(self, data: Dict[str, List[float]], ylabel: str, filename: str): |
|
|
"""绘制摆动相关指标的统计图""" |
|
|
plt.figure(figsize=(10, 6)) |
|
|
|
|
|
|
|
|
plt.style.use('seaborn-v0_8-whitegrid') |
|
|
colors = {'RF': 'red', 'RH': 'blue', 'LF': 'green', 'LH': 'purple'} |
|
|
|
|
|
|
|
|
paw_order = ['RF', 'RH', 'LF', 'LH'] |
|
|
|
|
|
|
|
|
box_plot = plt.boxplot([data[paw] for paw in paw_order], |
|
|
tick_labels=paw_order, |
|
|
patch_artist=True, |
|
|
medianprops={'color': 'black'}, |
|
|
flierprops={'marker': 'o', |
|
|
'markerfacecolor': 'none', |
|
|
'markersize': 4}) |
|
|
|
|
|
|
|
|
for patch, paw in zip(box_plot['boxes'], paw_order): |
|
|
patch.set_facecolor(colors[paw]) |
|
|
patch.set_alpha(0.6) |
|
|
|
|
|
for flier in box_plot['fliers']: |
|
|
flier.set_color(colors[paw]) |
|
|
|
|
|
|
|
|
for i, paw in enumerate(paw_order, 1): |
|
|
if paw in data: |
|
|
plt.scatter([i] * len(data[paw]), |
|
|
data[paw], |
|
|
color=colors[paw], |
|
|
alpha=0.4, |
|
|
s=30) |
|
|
|
|
|
plt.title(f'{ylabel} Distribution by Paw Type', pad=20) |
|
|
plt.xlabel('Paw Type', labelpad=10) |
|
|
plt.ylabel(ylabel, labelpad=10) |
|
|
plt.grid(True, axis='y', linestyle='--', alpha=0.3) |
|
|
plt.tight_layout() |
|
|
|
|
|
plt.savefig(f'{self.result_dir}/plots/{filename}.png', |
|
|
dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
|
|
|
def _save_swing_stats(self, stats: Dict): |
|
|
"""保存摆动相关指标的统计数据到CSV""" |
|
|
for metric, data in stats.items(): |
|
|
df = pd.DataFrame.from_dict( |
|
|
{(i, j): data[i][j] |
|
|
for i in data.keys() |
|
|
for j in data[i].keys()}, |
|
|
orient='index' |
|
|
) |
|
|
|
|
|
if metric in ['swing_duration', 'stride_duration']: |
|
|
df = df.round(3) |
|
|
else: |
|
|
df = df.round(1) |
|
|
df.to_csv(f'{self.result_dir}/data/{metric}_stats.csv') |
|
|
|
|
|
def analyze_stride_length(self): |
|
|
"""分析步幅长度""" |
|
|
print("\n开始分析步幅长度...") |
|
|
|
|
|
|
|
|
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} |
|
|
for area in self.footprint_areas: |
|
|
if area.paw_type in paw_groups: |
|
|
paw_groups[area.paw_type].append(area) |
|
|
|
|
|
|
|
|
stride_lengths = {paw_type: [] for paw_type in paw_groups} |
|
|
|
|
|
for paw_type, footprints in paw_groups.items(): |
|
|
|
|
|
footprints.sort(key=lambda x: x.start_frame) |
|
|
|
|
|
for i in range(len(footprints) - 1): |
|
|
current_print = footprints[i] |
|
|
next_print = footprints[i + 1] |
|
|
|
|
|
|
|
|
current_x = current_print.position['x'] + current_print.position['width'] / 2 |
|
|
next_x = next_print.position['x'] + next_print.position['width'] / 2 |
|
|
|
|
|
stride_length = abs(next_x - current_x) |
|
|
stride_lengths[paw_type].append(stride_length) |
|
|
|
|
|
|
|
|
self._plot_stride_stats(stride_lengths) |
|
|
|
|
|
|
|
|
stats = { |
|
|
paw_type: { |
|
|
'mean': np.mean(lengths), |
|
|
'std': np.std(lengths), |
|
|
'min': np.min(lengths), |
|
|
'max': np.max(lengths) |
|
|
} for paw_type, lengths in stride_lengths.items() if lengths |
|
|
} |
|
|
|
|
|
|
|
|
df = pd.DataFrame.from_dict( |
|
|
{(i, j): stats[i][j] |
|
|
for i in stats.keys() |
|
|
for j in stats[i].keys()}, |
|
|
orient='index' |
|
|
) |
|
|
df.to_csv(f'{self.result_dir}/data/stride_length_stats.csv') |
|
|
|
|
|
print("步幅长度分析完成!") |
|
|
return stats |
|
|
|
|
|
def _plot_stride_stats(self, stride_lengths: Dict[str, List[float]]): |
|
|
"""绘制步幅长度统计图""" |
|
|
plt.figure(figsize=(10, 6)) |
|
|
plt.style.use('seaborn-v0_8-whitegrid') |
|
|
|
|
|
colors = {'RF': 'red', 'RH': 'blue', 'LF': 'green', 'LH': 'purple'} |
|
|
paw_order = ['RF', 'RH', 'LF', 'LH'] |
|
|
|
|
|
box_plot = plt.boxplot([stride_lengths[paw] for paw in paw_order], |
|
|
tick_labels=paw_order, |
|
|
patch_artist=True, |
|
|
medianprops={'color': 'black'}, |
|
|
flierprops={'marker': 'o', |
|
|
'markerfacecolor': 'none', |
|
|
'markersize': 4}) |
|
|
|
|
|
|
|
|
for patch, paw in zip(box_plot['boxes'], paw_order): |
|
|
patch.set_facecolor(colors[paw]) |
|
|
patch.set_alpha(0.6) |
|
|
|
|
|
|
|
|
for i, paw in enumerate(paw_order, 1): |
|
|
plt.scatter([i] * len(stride_lengths[paw]), |
|
|
stride_lengths[paw], |
|
|
color=colors[paw], |
|
|
alpha=0.4, |
|
|
s=30) |
|
|
|
|
|
plt.title('Stride Length Distribution by Paw Type', pad=20) |
|
|
plt.xlabel('Paw Type', labelpad=10) |
|
|
plt.ylabel('Stride Length (pixels)', labelpad=10) |
|
|
plt.grid(True, axis='y', linestyle='--', alpha=0.3) |
|
|
plt.tight_layout() |
|
|
|
|
|
plt.savefig(f'{self.result_dir}/plots/stride_length.png', |
|
|
dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
|
|
|
def analyze_stance_width(self): |
|
|
"""分析触地宽度(前后肢之间的左右距离)""" |
|
|
print("\n开始分析触地宽度...") |
|
|
|
|
|
|
|
|
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} |
|
|
for area in self.footprint_areas: |
|
|
if area.paw_type in paw_groups: |
|
|
paw_groups[area.paw_type].append(area) |
|
|
|
|
|
|
|
|
front_widths = [] |
|
|
hind_widths = [] |
|
|
|
|
|
|
|
|
for paw_type in paw_groups: |
|
|
paw_groups[paw_type].sort(key=lambda x: x.start_frame) |
|
|
|
|
|
|
|
|
for rf in paw_groups['RF']: |
|
|
rf_center_x = rf.position['x'] + rf.position['width'] / 2 |
|
|
rf_frame = rf.start_frame |
|
|
|
|
|
|
|
|
closest_lf = min(paw_groups['LF'], |
|
|
key=lambda x: abs(x.start_frame - rf_frame), |
|
|
default=None) |
|
|
|
|
|
if closest_lf and abs(closest_lf.start_frame - rf_frame) < 10: |
|
|
lf_center_x = closest_lf.position['x'] + closest_lf.position['width'] / 2 |
|
|
front_width = abs(rf_center_x - lf_center_x) |
|
|
front_widths.append(front_width) |
|
|
|
|
|
|
|
|
for rh in paw_groups['RH']: |
|
|
rh_center_x = rh.position['x'] + rh.position['width'] / 2 |
|
|
rh_frame = rh.start_frame |
|
|
|
|
|
closest_lh = min(paw_groups['LH'], |
|
|
key=lambda x: abs(x.start_frame - rh_frame), |
|
|
default=None) |
|
|
|
|
|
if closest_lh and abs(closest_lh.start_frame - rh_frame) < 10: |
|
|
lh_center_x = closest_lh.position['x'] + closest_lh.position['width'] / 2 |
|
|
hind_width = abs(rh_center_x - lh_center_x) |
|
|
hind_widths.append(hind_width) |
|
|
|
|
|
|
|
|
stats = { |
|
|
'front_width': { |
|
|
'mean': np.mean(front_widths), |
|
|
'std': np.std(front_widths), |
|
|
'min': np.min(front_widths), |
|
|
'max': np.max(front_widths) |
|
|
} if front_widths else {}, |
|
|
'hind_width': { |
|
|
'mean': np.mean(hind_widths), |
|
|
'std': np.std(hind_widths), |
|
|
'min': np.min(hind_widths), |
|
|
'max': np.max(hind_widths) |
|
|
} if hind_widths else {} |
|
|
} |
|
|
|
|
|
|
|
|
self._plot_stance_width_stats(front_widths, hind_widths) |
|
|
|
|
|
|
|
|
df = pd.DataFrame.from_dict(stats, orient='index') |
|
|
df.to_csv(f'{self.result_dir}/data/stance_width_stats.csv') |
|
|
|
|
|
print("触地宽度分析完成!") |
|
|
return stats |
|
|
|
|
|
def _plot_stance_width_stats(self, front_widths: List[float], hind_widths: List[float]): |
|
|
"""绘制触地宽度统计图""" |
|
|
plt.figure(figsize=(8, 6)) |
|
|
plt.style.use('seaborn-v0_8-whitegrid') |
|
|
|
|
|
|
|
|
colors = { |
|
|
'Front': self.colors['RF'], |
|
|
'Hind': self.colors['RH'] |
|
|
} |
|
|
|
|
|
data = [front_widths, hind_widths] |
|
|
labels = ['Front', 'Hind'] |
|
|
|
|
|
box_plot = plt.boxplot(data, |
|
|
tick_labels=labels, |
|
|
patch_artist=True, |
|
|
medianprops={'color': 'black'}, |
|
|
flierprops={'marker': 'o', |
|
|
'markerfacecolor': 'none', |
|
|
'markersize': 4}) |
|
|
|
|
|
|
|
|
for patch, label in zip(box_plot['boxes'], labels): |
|
|
patch.set_facecolor(colors[label]) |
|
|
patch.set_alpha(0.6) |
|
|
|
|
|
|
|
|
for i, (widths, label) in enumerate(zip([front_widths, hind_widths], labels), 1): |
|
|
plt.scatter([i] * len(widths), |
|
|
widths, |
|
|
color=colors[label], |
|
|
alpha=0.4, |
|
|
s=30) |
|
|
|
|
|
plt.title('Stance Width Distribution', pad=20) |
|
|
plt.xlabel('Limb Type', labelpad=10) |
|
|
plt.ylabel('Width (pixels)', labelpad=10) |
|
|
plt.grid(True, axis='y', linestyle='--', alpha=0.3) |
|
|
plt.tight_layout() |
|
|
|
|
|
plt.savefig(f'{self.result_dir}/plots/stance_width.png', |
|
|
dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
|
|
|
def analyze_paw_area(self): |
|
|
"""分析脚爪面积(基于G通道的压力区域)""" |
|
|
print("\n开始分析脚爪面积...") |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture('exp_videos/Exp2.mp4') |
|
|
if not cap.isOpened(): |
|
|
raise ValueError("Cannot open video file") |
|
|
|
|
|
|
|
|
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} |
|
|
for area in self.footprint_areas: |
|
|
if area.paw_type in paw_groups: |
|
|
paw_groups[area.paw_type].append(area) |
|
|
|
|
|
|
|
|
paw_areas = {paw_type: [] for paw_type in paw_groups} |
|
|
|
|
|
|
|
|
pressure_threshold = 5 |
|
|
|
|
|
for paw_type, footprints in paw_groups.items(): |
|
|
for footprint in footprints: |
|
|
|
|
|
max_area = 0 |
|
|
max_intensity = 0 |
|
|
avg_intensity = 0 |
|
|
|
|
|
|
|
|
for frame_idx in range(footprint.start_frame, footprint.end_frame): |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
continue |
|
|
|
|
|
|
|
|
x = footprint.position['x'] |
|
|
y = footprint.position['y'] |
|
|
w = footprint.position['width'] |
|
|
h = footprint.position['height'] |
|
|
|
|
|
|
|
|
x = max(0, x) |
|
|
y = max(0, y) |
|
|
roi = frame[y:y+h, x:x+w] |
|
|
|
|
|
if roi.size > 0: |
|
|
|
|
|
g_channel = roi[:, :, 1] |
|
|
current_area = np.sum(g_channel > pressure_threshold) |
|
|
max_area = max(max_area, current_area) |
|
|
max_intensity = max(max_intensity, np.max(g_channel)) |
|
|
avg_intensity = max(avg_intensity, np.mean(g_channel)) |
|
|
|
|
|
if max_area > 0: |
|
|
paw_areas[paw_type].append(max_area) |
|
|
|
|
|
|
|
|
stats = { |
|
|
paw_type: { |
|
|
'mean': np.mean(areas), |
|
|
'std': np.std(areas), |
|
|
'min': np.min(areas), |
|
|
'max': np.max(areas) |
|
|
} for paw_type, areas in paw_areas.items() if areas |
|
|
} |
|
|
|
|
|
|
|
|
self._plot_paw_area_stats(paw_areas) |
|
|
|
|
|
|
|
|
df = pd.DataFrame.from_dict( |
|
|
{(i, j): stats[i][j] |
|
|
for i in stats.keys() |
|
|
for j in stats[i].keys()}, |
|
|
orient='index' |
|
|
) |
|
|
df.to_csv(f'{self.result_dir}/data/paw_area_stats.csv') |
|
|
|
|
|
cap.release() |
|
|
print("脚爪面积分析完成!") |
|
|
return stats |
|
|
|
|
|
def _plot_paw_area_stats(self, paw_areas: Dict[str, List[float]]): |
|
|
"""绘制脚爪面积统计图""" |
|
|
plt.figure(figsize=(10, 6)) |
|
|
plt.style.use('seaborn-v0_8-whitegrid') |
|
|
|
|
|
colors = {'RF': 'red', 'RH': 'blue', 'LF': 'green', 'LH': 'purple'} |
|
|
paw_order = ['RF', 'RH', 'LF', 'LH'] |
|
|
|
|
|
box_plot = plt.boxplot([paw_areas[paw] for paw in paw_order], |
|
|
tick_labels=paw_order, |
|
|
patch_artist=True, |
|
|
medianprops={'color': 'black'}, |
|
|
flierprops={'marker': 'o', |
|
|
'markerfacecolor': 'none', |
|
|
'markersize': 4}) |
|
|
|
|
|
|
|
|
for patch, paw in zip(box_plot['boxes'], paw_order): |
|
|
patch.set_facecolor(colors[paw]) |
|
|
patch.set_alpha(0.6) |
|
|
|
|
|
|
|
|
for i, paw in enumerate(paw_order, 1): |
|
|
plt.scatter([i] * len(paw_areas[paw]), |
|
|
paw_areas[paw], |
|
|
color=colors[paw], |
|
|
alpha=0.4, |
|
|
s=30) |
|
|
|
|
|
plt.title('Paw Area Distribution by Paw Type', pad=20) |
|
|
plt.xlabel('Paw Type', labelpad=10) |
|
|
plt.ylabel('Area (pixels²)', labelpad=10) |
|
|
plt.grid(True, axis='y', linestyle='--', alpha=0.3) |
|
|
plt.tight_layout() |
|
|
|
|
|
plt.savefig(f'{self.result_dir}/plots/paw_area.png', |
|
|
dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
|
|
|
def generate_detailed_table(self): |
|
|
print("\n开始生成详细数据表...") |
|
|
|
|
|
if not self.video_path: |
|
|
raise ValueError("未指定视频文件路径") |
|
|
|
|
|
|
|
|
print(f"\n检查关键点数据:") |
|
|
print(f"record_params keys: {self.record_params.keys()}") |
|
|
if 'bodyKeypoints' in self.record_params: |
|
|
print(f"bodyKeypoints 数量: {len(self.record_params['bodyKeypoints'])}") |
|
|
if self.record_params['bodyKeypoints']: |
|
|
print(f"第一个关键点数据示例: {self.record_params['bodyKeypoints'][0]}") |
|
|
else: |
|
|
print("未找到 bodyKeypoints 数据") |
|
|
|
|
|
|
|
|
keypoints_by_frame = {} |
|
|
for kp_data in self.record_params.get('bodyKeypoints', []): |
|
|
frame_id = kp_data['frame_id'] |
|
|
keypoints_by_frame[frame_id] = kp_data['keypoints'] |
|
|
|
|
|
print(f"处理后的关键点帧数: {len(keypoints_by_frame)}") |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(self.video_path) |
|
|
if not cap.isOpened(): |
|
|
raise ValueError(f"无法打开视频文件: {self.video_path}") |
|
|
|
|
|
detailed_data = [] |
|
|
movement_data = [] |
|
|
|
|
|
|
|
|
sorted_footprints = sorted(self.footprint_areas, key=lambda x: x.start_frame) |
|
|
|
|
|
for area in sorted_footprints: |
|
|
|
|
|
footprint_data = { |
|
|
'paw_type': area.paw_type, |
|
|
'initial_contact': area.start_frame / self.fps, |
|
|
'stand_duration': (area.end_frame - area.start_frame) / self.fps, |
|
|
'max_contact': area.end_frame / self.fps |
|
|
} |
|
|
|
|
|
|
|
|
middle_frame = area.start_frame + (area.end_frame - area.start_frame) // 2 |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame) |
|
|
ret, frame = cap.read() |
|
|
if ret: |
|
|
|
|
|
if middle_frame in keypoints_by_frame: |
|
|
kp = keypoints_by_frame[middle_frame] |
|
|
nose = kp['nose'] |
|
|
tail_base = kp['tail_base'] |
|
|
|
|
|
|
|
|
x_min = min(nose[0], tail_base[0]) |
|
|
x_max = max(nose[0], tail_base[0]) |
|
|
margin = (x_max - x_min) * 0.1 |
|
|
x_min = max(0, int(x_min - margin)) |
|
|
x_max = min(frame.shape[1], int(x_max + margin)) |
|
|
|
|
|
|
|
|
nose_relative_x = int(nose[0] - x_min) |
|
|
nose_relative_y = int(nose[1]) |
|
|
tail_base_relative_x = int(tail_base[0] - x_min) |
|
|
tail_base_relative_y = int(tail_base[1]) |
|
|
|
|
|
|
|
|
full_body_roi = frame[0:frame.shape[0], x_min:x_max] |
|
|
_, buffer_full = cv2.imencode('.png', full_body_roi) |
|
|
footprint_data['image_fullbody'] = base64.b64encode(buffer_full).decode('utf-8') |
|
|
|
|
|
footprint_data['keypoints'] = { |
|
|
'nose': {'x': nose_relative_x, 'y': nose_relative_y}, |
|
|
'tail_base': {'x': tail_base_relative_x, 'y': tail_base_relative_y} |
|
|
} |
|
|
|
|
|
|
|
|
x = max(0, area.position['x']) |
|
|
y = max(0, area.position['y']) |
|
|
w = area.position['width'] |
|
|
h = area.position['height'] |
|
|
roi = frame[y:y+h, x:x+w] |
|
|
|
|
|
|
|
|
roi_g = roi.copy() |
|
|
roi_g[:, :, 0] = 0 |
|
|
roi_g[:, :, 2] = 0 |
|
|
|
|
|
_, buffer = cv2.imencode('.png', roi_g) |
|
|
footprint_data['image'] = base64.b64encode(buffer).decode('utf-8') |
|
|
footprint_data['frame_id'] = middle_frame |
|
|
|
|
|
|
|
|
footprint_data['print_length'] = area.position['height'] / 100 |
|
|
footprint_data['print_width'] = area.position['width'] / 100 |
|
|
|
|
|
|
|
|
max_intensity = 0 |
|
|
avg_intensity = 0 |
|
|
area_size = 0 |
|
|
pressure_threshold = 30 |
|
|
|
|
|
for frame_idx in range(area.start_frame, area.end_frame): |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) |
|
|
ret, frame = cap.read() |
|
|
if ret: |
|
|
roi = frame[y:y+h, x:x+w] |
|
|
if roi.size > 0: |
|
|
g_channel = roi[:, :, 1] |
|
|
current_area = np.sum(g_channel > pressure_threshold) |
|
|
area_size = max(area_size, current_area) |
|
|
max_intensity = max(max_intensity, np.max(g_channel)) |
|
|
avg_intensity = max(avg_intensity, np.mean(g_channel)) |
|
|
|
|
|
footprint_data['print_area'] = area_size / 100 |
|
|
footprint_data['max_intensity'] = int(max_intensity) |
|
|
footprint_data['average_intensity'] = round(avg_intensity, 2) |
|
|
|
|
|
detailed_data.append(footprint_data) |
|
|
|
|
|
cap.release() |
|
|
|
|
|
|
|
|
movement_angles = self._calculate_smooth_movement_angles() |
|
|
for frame_id, angle in movement_angles.items(): |
|
|
movement_data.append({ |
|
|
'frame_id': frame_id, |
|
|
'movement_angle': angle |
|
|
}) |
|
|
|
|
|
|
|
|
output_data = { |
|
|
"base_footprint_data": detailed_data, |
|
|
"movement_direction_data": movement_data |
|
|
} |
|
|
|
|
|
|
|
|
output_path = f'{self.result_dir}/data/detailed_footprint_data.json' |
|
|
with open(output_path, 'w', encoding='utf-8') as f: |
|
|
json.dump(output_data, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
print(f"详细数据表已生成:{output_path}") |
|
|
|
|
|
|
|
|
print(f"\n数据统计:") |
|
|
print(f"- 足印数据条数: {len(detailed_data)}") |
|
|
print(f"- 运动方向数据条数: {len(movement_data)}") |
|
|
print(f"- 包含全身图像的足印数: {sum(1 for d in detailed_data if 'image_fullbody' in d)}") |
|
|
print(f"- 包含足印图像的足印数: {sum(1 for d in detailed_data if 'image' in d)}") |
|
|
|
|
|
|
|
|
if detailed_data: |
|
|
sample_data = detailed_data[0].copy() |
|
|
if 'image' in sample_data: |
|
|
sample_data['image'] = f"[base64 string of length {len(sample_data['image'])}]" |
|
|
if 'image_fullbody' in sample_data: |
|
|
sample_data['image_fullbody'] = f"[base64 string of length {len(sample_data['image_fullbody'])}]" |
|
|
print(f"\n示例数据结构:") |
|
|
print(json.dumps(sample_data, indent=2)) |
|
|
|
|
|
return output_data |
|
|
|
|
|
def _calculate_smooth_movement_angles(self): |
|
|
"""计算平滑的运动方向角度""" |
|
|
|
|
|
mid_points = [] |
|
|
frame_ids = [] |
|
|
for kp_data in self.record_params.get('bodyKeypoints', []): |
|
|
frame_ids.append(kp_data['frame_id']) |
|
|
mid_points.append(kp_data['keypoints']['mid']) |
|
|
|
|
|
if not mid_points: |
|
|
return {} |
|
|
|
|
|
|
|
|
points = np.array(mid_points) |
|
|
|
|
|
|
|
|
window_length = min(len(points), 15) |
|
|
if window_length % 2 == 0: |
|
|
window_length -= 1 |
|
|
if window_length >= 3: |
|
|
smooth_x = savgol_filter(points[:, 0], window_length, 3) |
|
|
smooth_y = savgol_filter(points[:, 1], window_length, 3) |
|
|
else: |
|
|
smooth_x = points[:, 0] |
|
|
smooth_y = points[:, 1] |
|
|
|
|
|
|
|
|
angles = {} |
|
|
for i in range(len(smooth_x)-1): |
|
|
dx = smooth_x[i+1] - smooth_x[i] |
|
|
dy = smooth_y[i+1] - smooth_y[i] |
|
|
angle = np.degrees(np.arctan2(dy, dx)) |
|
|
angles[frame_ids[i]] = angle |
|
|
|
|
|
|
|
|
if frame_ids: |
|
|
angles[frame_ids[-1]] = angles[frame_ids[-2]] if frame_ids[-2] in angles else 0 |
|
|
|
|
|
return angles |
|
|
|
|
|
def generate_collection_table(self): |
|
|
"""生成采集数据表格(Overview) |
|
|
从record_params获取:实验名称、开始时间、方向等信息 |
|
|
计算得出:运行时长、平均速度、步数、步频 |
|
|
""" |
|
|
|
|
|
direction_map = { |
|
|
"left_to_right": "L to R", |
|
|
"right_to_left": "R to L" |
|
|
} |
|
|
|
|
|
input_params = { |
|
|
"name": self.record_params.get("name", "未命名记录"), |
|
|
"start_time": self.record_params.get("start_time", datetime.now().strftime("%Y-%m-%d %H:%M:%S")), |
|
|
"direction": direction_map.get(self.record_params.get("direction", "left_to_right"), "L to R"), |
|
|
"walkway_length": f"{self.record_params.get('actual_length', 20)} cm", |
|
|
"walkway_width": f"{self.record_params.get('crop_height', 152)/8:.1f} cm", |
|
|
"weight": f"{self.record_params.get('weight', 0)} g" |
|
|
} |
|
|
|
|
|
|
|
|
sorted_footprints = sorted(self.footprint_areas, key=lambda x: x.start_frame) |
|
|
if sorted_footprints: |
|
|
first_frame = sorted_footprints[0].start_frame |
|
|
last_frame = sorted_footprints[-1].end_frame |
|
|
duration_seconds = (last_frame - first_frame) / self.fps |
|
|
|
|
|
|
|
|
steps_count = len(sorted_footprints) |
|
|
|
|
|
|
|
|
cadence = steps_count / duration_seconds if duration_seconds > 0 else 0 |
|
|
|
|
|
|
|
|
try: |
|
|
walkway_length_cm = float(input_params["walkway_length"].split()[0]) |
|
|
average_speed = walkway_length_cm / duration_seconds |
|
|
except (ValueError, IndexError): |
|
|
average_speed = 0 |
|
|
else: |
|
|
duration_seconds = 0 |
|
|
steps_count = 0 |
|
|
cadence = 0 |
|
|
average_speed = 0 |
|
|
|
|
|
|
|
|
collection_data = { |
|
|
|
|
|
"name": input_params["name"], |
|
|
"start_time": input_params["start_time"], |
|
|
"direction": input_params["direction"], |
|
|
"walkway_length": input_params["walkway_length"], |
|
|
"walkway_width": input_params["walkway_width"], |
|
|
"weight": input_params["weight"], |
|
|
|
|
|
|
|
|
"run_duration": f"{duration_seconds:.1f} s", |
|
|
"average_speed": f"{average_speed:.3f} cm/s", |
|
|
"steps_count": steps_count, |
|
|
"cadence": f"{cadence:.1f} steps/sec" |
|
|
} |
|
|
|
|
|
return collection_data |
|
|
|
|
|
def generate_paw_statistics(self) -> dict: |
|
|
"""生成足印统计数据表格,按照不同类别组织数据 |
|
|
|
|
|
Returns: |
|
|
dict: 包含每个足印的统计数据,按类别分组 |
|
|
""" |
|
|
print("\n开始生成足印统计数据...") |
|
|
|
|
|
|
|
|
scale_factor = self.record_params.get('actual_length', 20) / self.record_params.get('scale_length', 152) |
|
|
|
|
|
|
|
|
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} |
|
|
for area in self.footprint_areas: |
|
|
if area.paw_type in paw_groups: |
|
|
paw_groups[area.paw_type].append(area) |
|
|
|
|
|
for paw_type in paw_groups: |
|
|
paw_groups[paw_type].sort(key=lambda x: x.start_frame) |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(self.video_path) |
|
|
if not cap.isOpened(): |
|
|
raise ValueError("无法打开视频文件") |
|
|
|
|
|
paw_stats = {} |
|
|
|
|
|
for paw_type, footprints in paw_groups.items(): |
|
|
paw_stats[paw_type] = [] |
|
|
|
|
|
for i, footprint in enumerate(footprints): |
|
|
|
|
|
x = footprint.position['x'] |
|
|
y = footprint.position['y'] |
|
|
width = footprint.position['width'] |
|
|
height = footprint.position['height'] |
|
|
|
|
|
|
|
|
max_intensity = 0 |
|
|
mean_intensity = 0 |
|
|
min_intensity = 255 |
|
|
max_area = 0 |
|
|
pressure_threshold = 5 |
|
|
|
|
|
|
|
|
for frame_idx in range(footprint.start_frame, footprint.end_frame + 1): |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) |
|
|
ret, frame = cap.read() |
|
|
if ret: |
|
|
roi = frame[y:y+height, x:x+width] |
|
|
if roi.size > 0: |
|
|
g_channel = roi[:, :, 1] |
|
|
current_max = np.max(g_channel) |
|
|
current_mean = np.mean(g_channel) |
|
|
|
|
|
pressure_pixels = g_channel[g_channel > pressure_threshold] |
|
|
if pressure_pixels.size > 0: |
|
|
current_min = np.min(pressure_pixels) |
|
|
min_intensity = min(min_intensity, current_min) |
|
|
current_area = np.sum(g_channel > pressure_threshold) |
|
|
|
|
|
max_intensity = max(max_intensity, current_max) |
|
|
mean_intensity = max(mean_intensity, current_mean) |
|
|
max_area = max(max_area, current_area) |
|
|
|
|
|
|
|
|
initial_contact = footprint.start_frame / self.fps |
|
|
max_contact = footprint.end_frame / self.fps |
|
|
stand_duration = (footprint.end_frame - footprint.start_frame) / self.fps |
|
|
|
|
|
swing_time = 0 |
|
|
swing_speed = 0 |
|
|
stride_length = 0 |
|
|
if i < len(footprints) - 1: |
|
|
next_print = footprints[i + 1] |
|
|
swing_time = (next_print.start_frame - footprint.end_frame) / self.fps |
|
|
|
|
|
stride_length = abs( |
|
|
(next_print.position['x'] + next_print.position['width']/2) - |
|
|
(x + width/2) |
|
|
) * scale_factor |
|
|
|
|
|
if swing_time > 0: |
|
|
swing_speed = stride_length / swing_time |
|
|
|
|
|
|
|
|
footprint_data = { |
|
|
|
|
|
"basic_data": { |
|
|
"x_position": round(x * scale_factor, 1), |
|
|
"y_position": round(y * scale_factor, 1), |
|
|
"length": round(height * scale_factor, 1), |
|
|
"width": round(width * scale_factor, 1), |
|
|
"max_area": round(max_area * (scale_factor ** 2), 1) |
|
|
}, |
|
|
|
|
|
|
|
|
"time_points": { |
|
|
"initial_contact": round(initial_contact, 3), |
|
|
"stand": round(stand_duration, 3), |
|
|
"max_contact": round(max_contact, 3) |
|
|
}, |
|
|
|
|
|
|
|
|
"intensity": { |
|
|
"max_intensity": int(max_intensity), |
|
|
"min_intensity": int(min_intensity) if min_intensity < 255 else 0, |
|
|
"mean_intensity": round(mean_intensity, 1), |
|
|
"max_contact_ratio": round((max_intensity / 255) * 100, 1), |
|
|
"max_intensity_ratio": round((mean_intensity / max_intensity) * 100, 1) if max_intensity > 0 else 0 |
|
|
}, |
|
|
|
|
|
|
|
|
"support": { |
|
|
"support_duration": round(stand_duration, 3), |
|
|
"support_ratio": round((stand_duration / (stand_duration + swing_time)) * 100, 1) if (stand_duration + swing_time) > 0 else 0, |
|
|
"initial_bipedal_support": round(initial_contact, 3), |
|
|
"final_bipedal_support": round(max_contact, 3) |
|
|
}, |
|
|
|
|
|
|
|
|
"toe": { |
|
|
"toe_spread": round(width * scale_factor, 1), |
|
|
"intermediate_toe_spread": round(width * scale_factor * 0.5, 1), |
|
|
"angle_between_motion": 10, |
|
|
"angle_between_body_axis": 10 |
|
|
}, |
|
|
|
|
|
|
|
|
"other": { |
|
|
"swing": round(swing_time, 3), |
|
|
"swing_speed": round(swing_speed, 1), |
|
|
"stride_length": round(stride_length, 1), |
|
|
"step_cycle": round(stand_duration + swing_time, 3), |
|
|
"body_speed": round(stride_length / (stand_duration + swing_time), 1) if (stand_duration + swing_time) > 0 else 0 |
|
|
} |
|
|
} |
|
|
|
|
|
paw_stats[paw_type].append(footprint_data) |
|
|
|
|
|
cap.release() |
|
|
print("足印统计数据生成完成!") |
|
|
return paw_stats |
|
|
|
|
|
def generate_step_sequence_table(self) -> dict: |
|
|
"""生成步序数据统计表格 |
|
|
|
|
|
Returns: |
|
|
dict: 包含步序统计数据的字典 |
|
|
""" |
|
|
print("\n开始生成步序统计数据...") |
|
|
|
|
|
|
|
|
GAIT_PATTERNS = { |
|
|
'LH-RF-RH-LF': 'Aa', |
|
|
'LH-LF-RH-RF': 'Ab', |
|
|
'LH-RF-LF-RH': 'Ca', |
|
|
'LH-RH-LF-RF': 'Cb', |
|
|
'LH-RH-RF-LF': 'Ra', |
|
|
'LH-LF-RF-RH': 'Rb' |
|
|
} |
|
|
|
|
|
|
|
|
all_footprints = [] |
|
|
for area in self.footprint_areas: |
|
|
all_footprints.append({ |
|
|
'paw_type': area.paw_type, |
|
|
'start_time': area.start_frame / self.fps |
|
|
}) |
|
|
|
|
|
|
|
|
all_footprints.sort(key=lambda x: x['start_time']) |
|
|
|
|
|
|
|
|
sequences = [] |
|
|
current_seq = [] |
|
|
|
|
|
for fp in all_footprints: |
|
|
if not current_seq and fp['paw_type'] == 'LH': |
|
|
current_seq = [fp] |
|
|
elif current_seq: |
|
|
current_seq.append(fp) |
|
|
if len(current_seq) == 4: |
|
|
|
|
|
pattern = '-'.join([p['paw_type'] for p in current_seq]) |
|
|
if pattern in GAIT_PATTERNS: |
|
|
sequences.append(GAIT_PATTERNS[pattern]) |
|
|
current_seq = [] |
|
|
|
|
|
|
|
|
pattern_counts = { |
|
|
'Aa': 0, 'Ab': 0, 'Ca': 0, |
|
|
'Cb': 0, 'Ra': 0, 'Rb': 0 |
|
|
} |
|
|
|
|
|
for seq in sequences: |
|
|
if seq in pattern_counts: |
|
|
pattern_counts[seq] += 1 |
|
|
|
|
|
total_sequences = len(sequences) |
|
|
|
|
|
|
|
|
sequence_stats = { |
|
|
"summary": { |
|
|
"total_sequences": total_sequences, |
|
|
"normal_sequences": sum(pattern_counts.values()), |
|
|
"abnormal_sequences": total_sequences - sum(pattern_counts.values()) |
|
|
}, |
|
|
"patterns": [ |
|
|
{ |
|
|
"pattern_code": pattern, |
|
|
"count": count, |
|
|
"percentage": round((count / total_sequences * 100), 1) if total_sequences > 0 else 0 |
|
|
} |
|
|
for pattern, count in pattern_counts.items() |
|
|
], |
|
|
"raw_sequence": sequences |
|
|
} |
|
|
|
|
|
print("步序统计数据生成完成!") |
|
|
return sequence_stats |
|
|
|
|
|
def generate_foot_spacing_table(self) -> dict: |
|
|
"""生成足间距数据统计表格 |
|
|
|
|
|
Returns: |
|
|
dict: 包含足间距统计数据的字典 |
|
|
""" |
|
|
print("\n开始生成足间距统计数据...") |
|
|
|
|
|
|
|
|
scale_factor = self.record_params.get('actual_length', 20) / self.record_params.get('scale_length', 152) |
|
|
|
|
|
|
|
|
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} |
|
|
for area in self.footprint_areas: |
|
|
if area.paw_type in paw_groups: |
|
|
paw_groups[area.paw_type].append(area) |
|
|
|
|
|
|
|
|
def get_center_point(footprint): |
|
|
x = footprint.position['x'] + footprint.position['width'] / 2 |
|
|
y = footprint.position['y'] + footprint.position['height'] / 2 |
|
|
return (x, y) |
|
|
|
|
|
|
|
|
def calculate_distance(p1, p2): |
|
|
return np.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2) * scale_factor |
|
|
|
|
|
|
|
|
spacing_stats = { |
|
|
"front_paws": { |
|
|
"count": 0, |
|
|
"mean_spacing": 0, |
|
|
"distances": [] |
|
|
}, |
|
|
"hind_paws": { |
|
|
"count": 0, |
|
|
"mean_spacing": 0, |
|
|
"distances": [] |
|
|
}, |
|
|
"paw_pairs": { |
|
|
"RF": { |
|
|
"count": 0, |
|
|
"mean_spacing": 0, |
|
|
"standard_deviation": 0, |
|
|
"distances": [] |
|
|
}, |
|
|
"LF": { |
|
|
"count": 0, |
|
|
"mean_spacing": 0, |
|
|
"standard_deviation": 0, |
|
|
"distances": [] |
|
|
}, |
|
|
"LH": { |
|
|
"count": 0, |
|
|
"mean_spacing": 0, |
|
|
"standard_deviation": 0, |
|
|
"distances": [] |
|
|
}, |
|
|
"RH": { |
|
|
"count": 0, |
|
|
"mean_spacing": 0, |
|
|
"standard_deviation": 0, |
|
|
"distances": [] |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if len(paw_groups['RF']) > 0 and len(paw_groups['LF']) > 0: |
|
|
for rf in paw_groups['RF']: |
|
|
rf_center = get_center_point(rf) |
|
|
for lf in paw_groups['LF']: |
|
|
lf_center = get_center_point(lf) |
|
|
distance = calculate_distance(rf_center, lf_center) |
|
|
spacing_stats["front_paws"]["distances"].append(distance) |
|
|
|
|
|
|
|
|
if len(paw_groups['RH']) > 0 and len(paw_groups['LH']) > 0: |
|
|
for rh in paw_groups['RH']: |
|
|
rh_center = get_center_point(rh) |
|
|
for lh in paw_groups['LH']: |
|
|
lh_center = get_center_point(lh) |
|
|
distance = calculate_distance(rh_center, lh_center) |
|
|
spacing_stats["hind_paws"]["distances"].append(distance) |
|
|
|
|
|
|
|
|
for paw_type in ['RF', 'LF', 'LH', 'RH']: |
|
|
if len(paw_groups[paw_type]) > 1: |
|
|
for i in range(len(paw_groups[paw_type]) - 1): |
|
|
current = paw_groups[paw_type][i] |
|
|
next_paw = paw_groups[paw_type][i + 1] |
|
|
distance = calculate_distance( |
|
|
get_center_point(current), |
|
|
get_center_point(next_paw) |
|
|
) |
|
|
spacing_stats["paw_pairs"][paw_type]["distances"].append(distance) |
|
|
|
|
|
|
|
|
|
|
|
if spacing_stats["front_paws"]["distances"]: |
|
|
distances = spacing_stats["front_paws"]["distances"] |
|
|
spacing_stats["front_paws"].update({ |
|
|
"count": len(distances), |
|
|
"mean_spacing": round(np.mean(distances), 1) |
|
|
}) |
|
|
|
|
|
|
|
|
if spacing_stats["hind_paws"]["distances"]: |
|
|
distances = spacing_stats["hind_paws"]["distances"] |
|
|
spacing_stats["hind_paws"].update({ |
|
|
"count": len(distances), |
|
|
"mean_spacing": round(np.mean(distances), 1) |
|
|
}) |
|
|
|
|
|
|
|
|
for paw_type in ['RF', 'LF', 'LH', 'RH']: |
|
|
if spacing_stats["paw_pairs"][paw_type]["distances"]: |
|
|
distances = spacing_stats["paw_pairs"][paw_type]["distances"] |
|
|
mean_spacing = np.mean(distances) |
|
|
std_dev = np.std(distances) |
|
|
spacing_stats["paw_pairs"][paw_type].update({ |
|
|
"count": len(distances), |
|
|
"mean_spacing": round(mean_spacing, 1), |
|
|
"standard_deviation": round((std_dev / mean_spacing) * 100, 1) |
|
|
}) |
|
|
|
|
|
print("足间距统计数据生成完成!") |
|
|
return spacing_stats |
|
|
|
|
|
def generate_support_table(self) -> dict: |
|
|
"""生成支撑统计数据(带中英文字段)""" |
|
|
|
|
|
support_type_mapping = { |
|
|
"diagonal": "对角支撑", |
|
|
"four": "四肢支撑", |
|
|
"girdle": "同源支撑", |
|
|
"lateral": "同侧支撑", |
|
|
"single": "单肢支撑", |
|
|
"standing": "站立支撑", |
|
|
"three": "三肢支撑", |
|
|
"zero": "无支撑" |
|
|
} |
|
|
|
|
|
support_stats = { |
|
|
"support_sequence": [], |
|
|
"support_types": { |
|
|
"diagonal": 0.0, |
|
|
"four": 0.0, |
|
|
"girdle": 0.0, |
|
|
"lateral": 0.0, |
|
|
"single": 0.0, |
|
|
"standing": 0.0, |
|
|
"three": 0.0, |
|
|
"zero": 0.0 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
all_footprints = sorted( |
|
|
[{ |
|
|
'start_time': area.start_frame / self.fps, |
|
|
'end_time': area.end_frame / self.fps, |
|
|
'duration': (area.end_frame - area.start_frame) / self.fps, |
|
|
'paw_type': area.paw_type |
|
|
} for area in self.footprint_areas], |
|
|
key=lambda x: x['start_time'] |
|
|
) |
|
|
|
|
|
|
|
|
for fp in all_footprints: |
|
|
|
|
|
active_paws = [ |
|
|
p['paw_type'] for p in all_footprints |
|
|
if p['start_time'] <= fp['start_time'] < (p['start_time'] + p['duration']) |
|
|
] |
|
|
support_count = len(active_paws) |
|
|
|
|
|
|
|
|
support_formula = str(min(support_count, 4)) |
|
|
|
|
|
|
|
|
if support_count == 1: |
|
|
|
|
|
footfall_code = f"I-{active_paws[0]}" |
|
|
elif support_count == 2: |
|
|
|
|
|
paw1, paw2 = sorted(active_paws) |
|
|
|
|
|
|
|
|
if (paw1[0] != paw2[0]) and (paw1[1] != paw2[1]): |
|
|
type_code = "D" |
|
|
pos_code = paw1[1]+paw2[1] |
|
|
elif paw1[0] == paw2[0]: |
|
|
type_code = "G" |
|
|
pos_code = paw1[0] |
|
|
else: |
|
|
type_code = "L" |
|
|
pos_code = paw1[0] |
|
|
|
|
|
footfall_code = f"II-{type_code}{pos_code}" |
|
|
elif support_count == 3: |
|
|
|
|
|
lead_paw = min(active_paws) |
|
|
footfall_code = f"III-{lead_paw}" |
|
|
else: |
|
|
footfall_code = "IV" |
|
|
|
|
|
|
|
|
support_stats["support_sequence"].append({ |
|
|
"start_time": round(fp['start_time'], 4), |
|
|
"duration": round(fp['duration'], 4), |
|
|
"support_formula": support_formula, |
|
|
"footfall_formula": footfall_code |
|
|
}) |
|
|
|
|
|
|
|
|
support_type = "unknown" |
|
|
|
|
|
|
|
|
if support_count == 0: |
|
|
support_type = "zero" |
|
|
elif support_count == 1: |
|
|
support_type = "single" |
|
|
elif support_count == 2: |
|
|
if footfall_code.startswith("II-D"): |
|
|
support_type = "diagonal" |
|
|
elif footfall_code.startswith("II-G"): |
|
|
support_type = "girdle" |
|
|
elif footfall_code.startswith("II-L"): |
|
|
support_type = "lateral" |
|
|
else: |
|
|
support_type = "unknown_dual" |
|
|
elif support_count == 3: |
|
|
support_type = "three" |
|
|
elif support_count == 4: |
|
|
support_type = "four" |
|
|
|
|
|
|
|
|
if self._is_standing_state(fp['start_time']): |
|
|
support_type = "standing" |
|
|
|
|
|
|
|
|
if support_type in support_stats["support_types"]: |
|
|
support_stats["support_types"][support_type] += fp['duration'] |
|
|
else: |
|
|
logging.warning(f"未知支撑类型: {support_type} 时间: {fp['start_time']}") |
|
|
continue |
|
|
|
|
|
|
|
|
total_duration = sum(support_stats["support_types"].values()) |
|
|
if total_duration > 0: |
|
|
for k in support_stats["support_types"]: |
|
|
support_stats["support_types"][k] = round( |
|
|
(support_stats["support_types"][k] / total_duration) * 100, 4 |
|
|
) |
|
|
|
|
|
print("足支撑统计数据生成完成!") |
|
|
return support_stats |
|
|
|
|
|
def generate_coordination_table(self) -> dict: |
|
|
"""生成双足协调性数据统计表格 |
|
|
|
|
|
Returns: |
|
|
dict: 包含双足协调性统计数据的字典 |
|
|
""" |
|
|
print("\n开始生成双足协调性统计数据...") |
|
|
|
|
|
|
|
|
coordination_stats = { |
|
|
"summary": { |
|
|
"standard_mean": 0, |
|
|
"standard_deviation": 0, |
|
|
"mistakes": 0 |
|
|
}, |
|
|
"pairs": { |
|
|
"LF-RH": [], |
|
|
"LF-LH": [], |
|
|
"LF-RF": [], |
|
|
"RH-LF": [], |
|
|
"RH-LH": [], |
|
|
"RH-RF": [] |
|
|
}, |
|
|
"coordination_sequence": [] |
|
|
} |
|
|
|
|
|
|
|
|
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} |
|
|
for area in self.footprint_areas: |
|
|
if area.paw_type in paw_groups: |
|
|
paw_groups[area.paw_type].append({ |
|
|
'paw_type': area.paw_type, |
|
|
'start_time': area.start_frame / self.fps, |
|
|
'duration': (area.end_frame - area.start_frame) / self.fps, |
|
|
'end_time': area.end_frame / self.fps |
|
|
}) |
|
|
|
|
|
|
|
|
for paw_type in paw_groups: |
|
|
paw_groups[paw_type].sort(key=lambda x: x['start_time']) |
|
|
|
|
|
|
|
|
|
|
|
def analyze_pair_coordination(anchor_prints, target_prints, pair_type): |
|
|
pair_stats = [] |
|
|
for anchor in anchor_prints: |
|
|
|
|
|
if anchor['duration'] <= 0 or anchor['start_time'] < 0: |
|
|
continue |
|
|
|
|
|
|
|
|
valid_targets = [ |
|
|
t for t in target_prints |
|
|
if t['start_time'] >= anchor['start_time'] - 0.5 |
|
|
and t['start_time'] <= anchor['end_time'] + 0.5 |
|
|
] |
|
|
|
|
|
for target in valid_targets: |
|
|
|
|
|
time_diff = target['start_time'] - anchor['start_time'] |
|
|
value = (time_diff / anchor['duration']) * 100 |
|
|
|
|
|
|
|
|
if time_diff < 0: |
|
|
peculiarity = "ANT" |
|
|
value = abs(value) |
|
|
elif 0 <= value <= 100: |
|
|
peculiarity = "NOR" |
|
|
else: |
|
|
peculiarity = "DEL" |
|
|
value = value % 100 |
|
|
|
|
|
|
|
|
if abs(time_diff) > 2.0: |
|
|
continue |
|
|
|
|
|
coordination_entry = { |
|
|
"value": round(value, 4), |
|
|
"anchor_start": round(anchor['start_time'], 4), |
|
|
"anchor_duration": round(anchor['duration'], 4), |
|
|
"target_start": round(target['start_time'], 4), |
|
|
"peculiarity": peculiarity |
|
|
} |
|
|
pair_stats.append(coordination_entry) |
|
|
break |
|
|
|
|
|
return pair_stats |
|
|
|
|
|
pair_configs = [ |
|
|
('LF', 'RH', 'LF-RH'), |
|
|
('LF', 'LH', 'LF-LH'), |
|
|
('LF', 'RF', 'LF-RF'), |
|
|
('RH', 'LF', 'RH-LF'), |
|
|
('RH', 'LH', 'RH-LH'), |
|
|
('RH', 'RF', 'RH-RF') |
|
|
] |
|
|
|
|
|
all_values = [] |
|
|
mistakes_count = 0 |
|
|
|
|
|
for anchor_type, target_type, pair_name in pair_configs: |
|
|
pair_stats = analyze_pair_coordination( |
|
|
paw_groups[anchor_type], |
|
|
paw_groups[target_type], |
|
|
pair_name |
|
|
) |
|
|
coordination_stats["pairs"][pair_name] = pair_stats |
|
|
|
|
|
|
|
|
values = [stat["value"] for stat in pair_stats if stat["value"] > 0] |
|
|
all_values.extend(values) |
|
|
|
|
|
|
|
|
mistakes_count += sum(1 for stat in pair_stats if stat["peculiarity"] != "NOR") |
|
|
|
|
|
|
|
|
if all_values: |
|
|
coordination_stats["summary"].update({ |
|
|
"standard_mean": round(np.mean(all_values), 0), |
|
|
"standard_deviation": round(np.std(all_values), 0), |
|
|
"mistakes": mistakes_count |
|
|
}) |
|
|
|
|
|
print("双足协调性统计数据生成完成!") |
|
|
return coordination_stats |
|
|
|
|
|
def _is_standing_state(self, current_time: float) -> bool: |
|
|
"""修复后的站立状态判断""" |
|
|
|
|
|
start_time = current_time - 0.5 |
|
|
hind_prints = [ |
|
|
p for p in self.footprint_areas |
|
|
if p.paw_type in ('LH', 'RH') and |
|
|
(p.start_frame / self.fps) <= current_time <= (p.end_frame / self.fps) and |
|
|
(p.start_frame / self.fps) >= start_time |
|
|
] |
|
|
|
|
|
|
|
|
return ( |
|
|
len(hind_prints) == 2 and |
|
|
all((p.end_frame - p.start_frame)/self.fps >= 0.5 for p in hind_prints) |
|
|
) |
|
|
|
|
|
def generate_3d_footprint_analysis(self): |
|
|
"""生成3D足印分析数据""" |
|
|
print("\n开始生成3D足印可视化...") |
|
|
|
|
|
|
|
|
footprint_3d_dir = f'{self.result_dir}/plots/footprints_3d' |
|
|
os.makedirs(footprint_3d_dir, exist_ok=True) |
|
|
|
|
|
html_dir = f'{self.result_dir}/plots/interactive_3d' |
|
|
os.makedirs(html_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
self._generate_3d_footprint_plots() |
|
|
|
|
|
|
|
|
interactive_3d_data = self._generate_interactive_3d_footprints() |
|
|
|
|
|
|
|
|
self._create_3d_footprint_index(html_dir) |
|
|
|
|
|
return interactive_3d_data |
|
|
|
|
|
def _generate_3d_footprint_plots(self): |
|
|
"""为每个足印区域生成3D图像""" |
|
|
import matplotlib.pyplot as plt |
|
|
from mpl_toolkits.mplot3d import Axes3D |
|
|
import numpy as np |
|
|
import os |
|
|
|
|
|
|
|
|
footprint_3d_dir = f'{self.result_dir}/plots/footprints_3d' |
|
|
os.makedirs(footprint_3d_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
cluster_groups = {} |
|
|
for footprint in self.footprint_areas: |
|
|
if footprint.area_id not in cluster_groups: |
|
|
cluster_groups[footprint.area_id] = [] |
|
|
cluster_groups[footprint.area_id].append(footprint) |
|
|
|
|
|
|
|
|
for cluster_id, footprints in cluster_groups.items(): |
|
|
|
|
|
sorted_footprints = sorted(footprints, key=lambda x: x.start_frame) |
|
|
key_footprint = sorted_footprints[len(sorted_footprints)//2] |
|
|
|
|
|
|
|
|
if self.video_path: |
|
|
cap = cv2.VideoCapture(self.video_path) |
|
|
if not cap.isOpened(): |
|
|
print(f"无法打开视频: {self.video_path}") |
|
|
continue |
|
|
|
|
|
|
|
|
middle_frame = (key_footprint.start_frame + key_footprint.end_frame) // 2 |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame) |
|
|
ret, frame = cap.read() |
|
|
|
|
|
if not ret: |
|
|
print(f"无法读取帧: {middle_frame}") |
|
|
cap.release() |
|
|
continue |
|
|
|
|
|
|
|
|
x = max(0, key_footprint.position['x']) |
|
|
y = max(0, key_footprint.position['y']) |
|
|
w = key_footprint.position['width'] |
|
|
h = key_footprint.position['height'] |
|
|
|
|
|
if x+w > frame.shape[1] or y+h > frame.shape[0]: |
|
|
print(f"ROI超出图像范围: {x},{y},{w},{h}") |
|
|
cap.release() |
|
|
continue |
|
|
|
|
|
roi = frame[y:y+h, x:x+w] |
|
|
|
|
|
|
|
|
green_channel = roi[:,:,1] |
|
|
|
|
|
|
|
|
max_side = max(green_channel.shape[0], green_channel.shape[1]) |
|
|
square_patch = np.zeros((max_side, max_side), dtype=np.uint8) |
|
|
|
|
|
|
|
|
start_y = (max_side - green_channel.shape[0]) // 2 |
|
|
start_x = (max_side - green_channel.shape[1]) // 2 |
|
|
square_patch[start_y:start_y+green_channel.shape[0], |
|
|
start_x:start_x+green_channel.shape[1]] = green_channel |
|
|
|
|
|
|
|
|
x = np.linspace(0, square_patch.shape[1]-1, square_patch.shape[1]) |
|
|
y = np.linspace(0, square_patch.shape[0]-1, square_patch.shape[0]) |
|
|
X, Y = np.meshgrid(x, y) |
|
|
|
|
|
|
|
|
fig = plt.figure(figsize=(10, 8)) |
|
|
ax = fig.add_subplot(111, projection='3d') |
|
|
|
|
|
|
|
|
surf = ax.plot_surface(X, Y, square_patch, cmap='viridis') |
|
|
|
|
|
|
|
|
paw_type = key_footprint.paw_type |
|
|
ax.set_title(f'Footprint 3D View - {paw_type} #{cluster_id}') |
|
|
ax.set_xlabel('X') |
|
|
ax.set_ylabel('Y') |
|
|
ax.set_zlabel('Intensity') |
|
|
|
|
|
|
|
|
fig.colorbar(surf) |
|
|
|
|
|
|
|
|
plt.savefig(f'{footprint_3d_dir}/footprint_{cluster_id}_{paw_type}.png', |
|
|
dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
cap.release() |
|
|
|
|
|
def _generate_interactive_3d_footprints(self): |
|
|
"""生成交互式3D足印热图可视化并返回base64编码的HTML内容""" |
|
|
try: |
|
|
import plotly.graph_objects as go |
|
|
from plotly.subplots import make_subplots |
|
|
import numpy as np |
|
|
import os |
|
|
import cv2 |
|
|
from scipy.interpolate import griddata |
|
|
except ImportError: |
|
|
print("请安装必要的库: pip install plotly scipy") |
|
|
return {} |
|
|
|
|
|
|
|
|
html_dir = f'{self.result_dir}/plots/interactive_3d' |
|
|
os.makedirs(html_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
cluster_groups = {} |
|
|
for footprint in self.footprint_areas: |
|
|
if footprint.area_id not in cluster_groups: |
|
|
cluster_groups[footprint.area_id] = [] |
|
|
cluster_groups[footprint.area_id].append(footprint) |
|
|
|
|
|
|
|
|
html_data = {} |
|
|
|
|
|
|
|
|
for cluster_id, footprints in cluster_groups.items(): |
|
|
|
|
|
sorted_footprints = sorted(footprints, key=lambda x: x.start_frame) |
|
|
key_footprint = sorted_footprints[len(sorted_footprints)//2] |
|
|
|
|
|
paw_type = key_footprint.paw_type |
|
|
html_path = f'{html_dir}/footprint_{cluster_id}_{paw_type}.html' |
|
|
|
|
|
|
|
|
if self.video_path: |
|
|
cap = cv2.VideoCapture(self.video_path) |
|
|
if not cap.isOpened(): |
|
|
print(f"无法打开视频: {self.video_path}") |
|
|
continue |
|
|
|
|
|
|
|
|
middle_frame = (key_footprint.start_frame + key_footprint.end_frame) // 2 |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame) |
|
|
ret, frame = cap.read() |
|
|
|
|
|
if not ret: |
|
|
print(f"无法读取帧: {middle_frame}") |
|
|
cap.release() |
|
|
continue |
|
|
|
|
|
|
|
|
x = max(0, key_footprint.position['x']) |
|
|
y = max(0, key_footprint.position['y']) |
|
|
w = key_footprint.position['width'] |
|
|
h = key_footprint.position['height'] |
|
|
|
|
|
if x+w > frame.shape[1] or y+h > frame.shape[0]: |
|
|
print(f"ROI超出图像范围: {x},{y},{w},{h}") |
|
|
cap.release() |
|
|
continue |
|
|
|
|
|
roi = frame[y:y+h, x:x+w] |
|
|
|
|
|
|
|
|
if len(roi.shape) == 2 or (len(roi.shape) == 3 and roi.shape[2] == 1): |
|
|
|
|
|
if len(roi.shape) == 3: |
|
|
gray = roi[:,:,0] |
|
|
else: |
|
|
gray = roi |
|
|
else: |
|
|
|
|
|
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) |
|
|
|
|
|
|
|
|
lower_green = np.array([40, 40, 40]) |
|
|
upper_green = np.array([80, 255, 255]) |
|
|
|
|
|
|
|
|
mask = cv2.inRange(hsv, lower_green, upper_green) |
|
|
|
|
|
|
|
|
green_only = cv2.bitwise_and(roi, roi, mask=mask) |
|
|
|
|
|
|
|
|
gray = cv2.cvtColor(green_only, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
|
|
|
smooth_gray = ndimage.gaussian_filter(gray, sigma=1.5) |
|
|
|
|
|
|
|
|
threshold = np.max(smooth_gray) * 0.1 if np.max(smooth_gray) > 0 else 0 |
|
|
filtered_gray = np.where(smooth_gray < threshold, 0, smooth_gray) |
|
|
|
|
|
|
|
|
max_side = max(filtered_gray.shape[0], filtered_gray.shape[1]) |
|
|
square_patch = np.zeros((max_side, max_side), dtype=np.uint8) |
|
|
|
|
|
|
|
|
start_y = (max_side - filtered_gray.shape[0]) // 2 |
|
|
start_x = (max_side - filtered_gray.shape[1]) // 2 |
|
|
square_patch[start_y:start_y+filtered_gray.shape[0], |
|
|
start_x:start_x+filtered_gray.shape[1]] = filtered_gray |
|
|
|
|
|
|
|
|
factor = 2 |
|
|
new_size = max_side * factor |
|
|
|
|
|
|
|
|
y_old, x_old = np.mgrid[0:max_side, 0:max_side] |
|
|
|
|
|
y_new, x_new = np.mgrid[0:max_side:complex(0, new_size), 0:max_side:complex(0, new_size)] |
|
|
|
|
|
|
|
|
z_upscaled = griddata((y_old.flatten(), x_old.flatten()), square_patch.flatten(), |
|
|
(y_new, x_new), method='cubic', fill_value=0) |
|
|
|
|
|
|
|
|
z_upscaled = ndimage.gaussian_filter(z_upscaled, sigma=1) |
|
|
|
|
|
|
|
|
fig = go.Figure(data=[go.Surface( |
|
|
z=z_upscaled, |
|
|
colorscale='Jet', |
|
|
colorbar=dict(title="强度"), |
|
|
contours = { |
|
|
"z": {"show": True, "start": 0, "end": 255, "size": 10} |
|
|
} |
|
|
)]) |
|
|
|
|
|
|
|
|
fig.update_layout( |
|
|
title=f'足印3D交互式热图 - {paw_type} #{cluster_id}', |
|
|
width=800, |
|
|
height=800, |
|
|
scene=dict( |
|
|
xaxis_title='X', |
|
|
yaxis_title='Y', |
|
|
zaxis_title='强度', |
|
|
aspectratio=dict(x=1, y=1, z=0.5), |
|
|
camera=dict( |
|
|
eye=dict(x=1.5, y=1.5, z=0.8) |
|
|
) |
|
|
), |
|
|
margin=dict(l=0, r=0, b=0, t=30) |
|
|
) |
|
|
|
|
|
|
|
|
fig.write_html( |
|
|
html_path, |
|
|
include_plotlyjs='cdn', |
|
|
full_html=True, |
|
|
config={ |
|
|
'displayModeBar': True, |
|
|
'editable': True, |
|
|
'toImageButtonOptions': { |
|
|
'format': 'png', |
|
|
'filename': f'footprint_{cluster_id}_{paw_type}', |
|
|
'height': 800, |
|
|
'width': 800, |
|
|
'scale': 2 |
|
|
} |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
with open(html_path, 'rb') as f: |
|
|
html_content = f.read() |
|
|
html_base64 = base64.b64encode(html_content).decode('utf-8') |
|
|
|
|
|
|
|
|
html_data[f"{cluster_id}_{paw_type}"] = { |
|
|
'html_base64': html_base64, |
|
|
'filename': f'footprint_{cluster_id}_{paw_type}.html', |
|
|
'paw_type': paw_type, |
|
|
'cluster_id': cluster_id |
|
|
} |
|
|
|
|
|
cap.release() |
|
|
|
|
|
print(f"交互式3D足印热图已保存至: {html_path}") |
|
|
|
|
|
|
|
|
self._create_3d_footprint_index(html_dir) |
|
|
|
|
|
return html_data |
|
|
|
|
|
def _create_3d_footprint_index(self, html_dir): |
|
|
"""创建3D足印可视化的HTML索引页面""" |
|
|
html_files = [f for f in os.listdir(html_dir) if f.endswith('.html') and f != 'index.html'] |
|
|
|
|
|
if not html_files: |
|
|
return |
|
|
|
|
|
index_html = f'{html_dir}/index.html' |
|
|
|
|
|
|
|
|
paw_types = { |
|
|
'leftFront': [], |
|
|
'rightFront': [], |
|
|
'leftHind': [], |
|
|
'rightHind': [], |
|
|
'unknown': [] |
|
|
} |
|
|
|
|
|
type_map = { |
|
|
'LF': 'leftFront', |
|
|
'RF': 'rightFront', |
|
|
'LH': 'leftHind', |
|
|
'RH': 'rightHind' |
|
|
} |
|
|
|
|
|
for filename in html_files: |
|
|
for paw_code, paw_name in type_map.items(): |
|
|
if paw_code in filename: |
|
|
paw_types[paw_name].append(filename) |
|
|
break |
|
|
else: |
|
|
paw_types['unknown'].append(filename) |
|
|
|
|
|
|
|
|
with open(index_html, 'w', encoding='utf-8') as f: |
|
|
f.write(""" |
|
|
<!DOCTYPE html> |
|
|
<html> |
|
|
<head> |
|
|
<meta charset="UTF-8"> |
|
|
<title>足印3D可视化索引</title> |
|
|
<style> |
|
|
body { font-family: Arial, sans-serif; line-height: 1.6; margin: 20px; } |
|
|
h1 { color: #333; text-align: center; } |
|
|
h2 { color: #555; margin-top: 30px; } |
|
|
.container { display: flex; flex-wrap: wrap; justify-content: center; } |
|
|
.item { margin: 10px; text-align: center; } |
|
|
.item a { display: block; padding: 10px; border: 1px solid #ddd; border-radius: 5px; |
|
|
text-decoration: none; color: #555; transition: all 0.3s; } |
|
|
.item a:hover { background-color: #f5f5f5; transform: scale(1.05); } |
|
|
.leftFront a { border-color: #ff9999; } |
|
|
.rightFront a { border-color: #99ff99; } |
|
|
.leftHind a { border-color: #9999ff; } |
|
|
.rightHind a { border-color: #ffff99; } |
|
|
</style> |
|
|
</head> |
|
|
<body> |
|
|
<h1>足印3D交互式可视化索引</h1> |
|
|
""") |
|
|
|
|
|
|
|
|
type_labels = { |
|
|
'leftFront': '左前爪', |
|
|
'rightFront': '右前爪', |
|
|
'leftHind': '左后爪', |
|
|
'rightHind': '右后爪', |
|
|
'unknown': '未知类型' |
|
|
} |
|
|
|
|
|
for paw_type, files in paw_types.items(): |
|
|
if files: |
|
|
f.write(f'<h2>{type_labels[paw_type]}</h2>\n') |
|
|
f.write('<div class="container">\n') |
|
|
|
|
|
for filename in sorted(files): |
|
|
|
|
|
parts = filename.split('_') |
|
|
if len(parts) >= 2: |
|
|
cluster_id = parts[1] |
|
|
f.write(f'<div class="item {paw_type}">\n') |
|
|
f.write(f'<a href="{filename}" target="_blank">足印 #{cluster_id}</a>\n') |
|
|
f.write('</div>\n') |
|
|
|
|
|
f.write('</div>\n') |
|
|
|
|
|
f.write(""" |
|
|
</body> |
|
|
</html> |
|
|
""") |
|
|
|
|
|
print(f"足印3D可视化索引页面已创建: {index_html}") |
|
|
|
|
|
|
|
|
with open(index_html, 'rb') as f: |
|
|
html_content = f.read() |
|
|
index_base64 = base64.b64encode(html_content).decode('utf-8') |
|
|
|
|
|
return index_base64 |
|
|
|
|
|
def generate_footprint_timeline(self): |
|
|
"""生成足印步行图分析数据""" |
|
|
print("\n开始生成足印步行图...") |
|
|
|
|
|
|
|
|
timeline_dir = f'{self.result_dir}/plots/footprint_timeline' |
|
|
os.makedirs(timeline_dir, exist_ok=True) |
|
|
|
|
|
videos_dir = f'{self.result_dir}/videos' |
|
|
os.makedirs(videos_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
timeline_data = self._generate_footprint_timeline_video() |
|
|
|
|
|
return timeline_data |
|
|
|
|
|
def _generate_footprint_timeline_video(self): |
|
|
"""生成足印步行热图视频和图片序列,并返回base64编码的数据""" |
|
|
import numpy as np |
|
|
import matplotlib.pyplot as plt |
|
|
from matplotlib.colors import LinearSegmentedColormap |
|
|
import os |
|
|
from matplotlib.patches import Rectangle |
|
|
|
|
|
|
|
|
timeline_dir = f'{self.result_dir}/plots/footprint_timeline' |
|
|
os.makedirs(timeline_dir, exist_ok=True) |
|
|
|
|
|
videos_dir = f'{self.result_dir}/videos' |
|
|
os.makedirs(videos_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
video_fps = self.fps |
|
|
|
|
|
|
|
|
paw_colors = { |
|
|
'LF': 'red', |
|
|
'RF': 'green', |
|
|
'LH': 'blue', |
|
|
'RH': 'yellow' |
|
|
} |
|
|
|
|
|
|
|
|
cmap_jet = plt.cm.get_cmap('jet') |
|
|
|
|
|
|
|
|
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} |
|
|
for area in self.footprint_areas: |
|
|
paw_groups[area.paw_type].append(area) |
|
|
|
|
|
|
|
|
cluster_to_prints = {} |
|
|
for footprint in self.footprint_areas: |
|
|
if footprint.area_id not in cluster_to_prints: |
|
|
cluster_to_prints[footprint.area_id] = [] |
|
|
|
|
|
|
|
|
simple_id = footprint.area_id |
|
|
if footprint.area_id.startswith("footprintArea_"): |
|
|
simple_id = footprint.area_id.replace("footprintArea_", "") |
|
|
|
|
|
cluster_to_prints[footprint.area_id].append({ |
|
|
'cluster_id': footprint.area_id, |
|
|
'simple_id': simple_id, |
|
|
'paw_type': footprint.paw_type, |
|
|
'start_frame': footprint.start_frame, |
|
|
'end_frame': footprint.end_frame, |
|
|
'position': footprint.position, |
|
|
'x': footprint.position['x'] + footprint.position['width']/2, |
|
|
'y': footprint.position['y'] + footprint.position['height']/2, |
|
|
'w': footprint.position['width'], |
|
|
'h': footprint.position['height'], |
|
|
'frame_id': footprint.start_frame |
|
|
}) |
|
|
|
|
|
|
|
|
key_footprints = [] |
|
|
cluster_heatmaps = {} |
|
|
|
|
|
for cluster_id, prints in cluster_to_prints.items(): |
|
|
|
|
|
sorted_prints = sorted(prints, key=lambda x: x['start_frame']) |
|
|
key_print = sorted_prints[len(sorted_prints)//2] |
|
|
key_footprints.append(key_print) |
|
|
|
|
|
|
|
|
if self.video_path: |
|
|
cap = cv2.VideoCapture(self.video_path) |
|
|
if not cap.isOpened(): |
|
|
print(f"无法打开视频: {self.video_path}") |
|
|
continue |
|
|
|
|
|
|
|
|
middle_frame = (key_print['start_frame'] + key_print['end_frame']) // 2 |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame) |
|
|
ret, frame = cap.read() |
|
|
|
|
|
if not ret: |
|
|
print(f"无法读取帧: {middle_frame}") |
|
|
cap.release() |
|
|
continue |
|
|
|
|
|
|
|
|
x = max(0, int(key_print['position']['x'])) |
|
|
y = max(0, int(key_print['position']['y'])) |
|
|
w = int(key_print['position']['width']) |
|
|
h = int(key_print['position']['height']) |
|
|
|
|
|
if x+w > frame.shape[1] or y+h > frame.shape[0]: |
|
|
print(f"ROI超出图像范围: {x},{y},{w},{h}") |
|
|
cap.release() |
|
|
continue |
|
|
|
|
|
patch = frame[y:y+h, x:x+w] |
|
|
|
|
|
|
|
|
if len(patch.shape) == 2 or (len(patch.shape) == 3 and patch.shape[2] == 1): |
|
|
|
|
|
if len(patch.shape) == 3: |
|
|
gray = patch[:,:,0] |
|
|
else: |
|
|
gray = patch |
|
|
else: |
|
|
|
|
|
hsv = cv2.cvtColor(patch, cv2.COLOR_BGR2HSV) |
|
|
|
|
|
|
|
|
lower_green = np.array([40, 40, 40]) |
|
|
upper_green = np.array([80, 255, 255]) |
|
|
|
|
|
|
|
|
mask = cv2.inRange(hsv, lower_green, upper_green) |
|
|
|
|
|
|
|
|
green_only = cv2.bitwise_and(patch, patch, mask=mask) |
|
|
|
|
|
|
|
|
gray = cv2.cvtColor(green_only, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
|
|
|
smooth_gray = ndimage.gaussian_filter(gray, sigma=1.0) |
|
|
|
|
|
|
|
|
threshold = np.max(smooth_gray) * 0.1 if np.max(smooth_gray) > 0 else 0 |
|
|
filtered_gray = np.where(smooth_gray < threshold, 0, smooth_gray) |
|
|
|
|
|
|
|
|
max_val = np.max(filtered_gray) |
|
|
if max_val > 0: |
|
|
norm_gray = filtered_gray / max_val |
|
|
else: |
|
|
norm_gray = filtered_gray |
|
|
|
|
|
|
|
|
heatmap = cmap_jet(norm_gray) |
|
|
|
|
|
|
|
|
heatmap_bgr = (heatmap[:,:,:3][:,:,::-1] * 255).astype(np.uint8) |
|
|
|
|
|
|
|
|
alpha = np.where(norm_gray > 0, 0.7, 0).reshape(norm_gray.shape[0], norm_gray.shape[1], 1) |
|
|
heatmap_bgra = np.concatenate([heatmap_bgr, (alpha * 255).astype(np.uint8)], axis=2) |
|
|
|
|
|
|
|
|
cluster_heatmaps[cluster_id] = { |
|
|
'heatmap': heatmap_bgra, |
|
|
'print': key_print |
|
|
} |
|
|
|
|
|
cap.release() |
|
|
|
|
|
|
|
|
key_footprints.sort(key=lambda x: x['start_frame']) |
|
|
|
|
|
|
|
|
first_frame = min(p['start_frame'] for p in key_footprints) |
|
|
last_frame = max(p['end_frame'] for p in key_footprints) |
|
|
|
|
|
|
|
|
min_x = min(p['position']['x'] for p in key_footprints) |
|
|
max_x = max(p['position']['x'] + p['position']['width'] for p in key_footprints) |
|
|
min_y = min(p['position']['y'] for p in key_footprints) |
|
|
max_y = max(p['position']['y'] + p['position']['height'] for p in key_footprints) |
|
|
|
|
|
|
|
|
margin = 50 |
|
|
min_x = max(0, min_x - margin) |
|
|
min_y = max(0, min_y - margin) |
|
|
max_x += margin |
|
|
max_y += margin |
|
|
|
|
|
|
|
|
width = int(max_x - min_x) |
|
|
height = int(max_y - min_y) |
|
|
|
|
|
|
|
|
width = width + (width % 2) |
|
|
height = height + (height % 2) |
|
|
|
|
|
|
|
|
raw_video_path = f'{videos_dir}/footprint_timeline_raw.mp4' |
|
|
video_path = f'{videos_dir}/footprint_timeline.mp4' |
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
video_writer = cv2.VideoWriter(raw_video_path, fourcc, video_fps, (width, height)) |
|
|
|
|
|
|
|
|
frame_to_key_prints = {} |
|
|
for p in key_footprints: |
|
|
frame_id = p['start_frame'] |
|
|
if frame_id not in frame_to_key_prints: |
|
|
frame_to_key_prints[frame_id] = [] |
|
|
frame_to_key_prints[frame_id].append(p) |
|
|
|
|
|
|
|
|
progress_frames = {} |
|
|
progress_points = [0, 25, 50, 75, 100] |
|
|
total_frames = last_frame - first_frame |
|
|
progress_frame_ids = [first_frame + int(p * total_frames / 100) for p in progress_points] |
|
|
|
|
|
|
|
|
for frame_id in range(first_frame, last_frame + 1): |
|
|
|
|
|
canvas = np.zeros((height, width, 3), dtype=np.uint8) |
|
|
canvas.fill(255) |
|
|
|
|
|
|
|
|
for f_id in range(first_frame, frame_id + 1): |
|
|
if f_id in frame_to_key_prints: |
|
|
for footprint in frame_to_key_prints[f_id]: |
|
|
cluster_id = footprint['cluster_id'] |
|
|
if cluster_id in cluster_heatmaps: |
|
|
heatmap_data = cluster_heatmaps[cluster_id] |
|
|
heatmap = heatmap_data['heatmap'] |
|
|
|
|
|
|
|
|
x = int(footprint['position']['x'] - min_x) |
|
|
y = int(footprint['position']['y'] - min_y) |
|
|
w = int(footprint['position']['width']) |
|
|
h = int(footprint['position']['height']) |
|
|
|
|
|
|
|
|
scale_x = w / heatmap.shape[1] |
|
|
scale_y = h / heatmap.shape[0] |
|
|
scale = min(scale_x, scale_y) |
|
|
|
|
|
|
|
|
new_w = int(heatmap.shape[1] * scale) |
|
|
new_h = int(heatmap.shape[0] * scale) |
|
|
|
|
|
if new_w > 0 and new_h > 0: |
|
|
resized_heatmap = cv2.resize(heatmap, (new_w, new_h)) |
|
|
|
|
|
|
|
|
offset_x = (w - new_w) // 2 |
|
|
offset_y = (h - new_h) // 2 |
|
|
|
|
|
|
|
|
paw_type = footprint['paw_type'] |
|
|
simple_id = footprint['simple_id'] |
|
|
text = f"{paw_type} {simple_id}" |
|
|
cv2.putText(canvas, text, (x + w + 5, y + h//2), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1) |
|
|
|
|
|
|
|
|
|
|
|
for i in range(new_h): |
|
|
for j in range(new_w): |
|
|
if 0 <= x + j + offset_x < width and 0 <= y + i + offset_y < height: |
|
|
alpha = resized_heatmap[i, j, 3] / 255.0 |
|
|
if alpha > 0: |
|
|
canvas[y + i + offset_y, x + j + offset_x] = \ |
|
|
(1 - alpha) * canvas[y + i + offset_y, x + j + offset_x] + \ |
|
|
alpha * resized_heatmap[i, j, :3] |
|
|
|
|
|
|
|
|
cv2.putText(canvas, f"Frame: {frame_id}", (10, 30), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
|
|
|
|
|
|
if frame_id in progress_frame_ids: |
|
|
progress = progress_points[progress_frame_ids.index(frame_id)] |
|
|
progress_img_path = f'{timeline_dir}/timeline_progress_{progress}.png' |
|
|
cv2.imwrite(progress_img_path, canvas) |
|
|
progress_frames[progress] = progress_img_path |
|
|
|
|
|
|
|
|
video_writer.write(canvas) |
|
|
|
|
|
|
|
|
video_writer.release() |
|
|
|
|
|
|
|
|
try: |
|
|
print(f"使用ffmpeg处理视频以提高兼容性...") |
|
|
|
|
|
|
|
|
if 'ffmpeg' in globals(): |
|
|
( |
|
|
ffmpeg |
|
|
.input(raw_video_path) |
|
|
.output(video_path, vcodec='libx264', crf=23, preset='fast', acodec='aac', audio_bitrate='128k') |
|
|
.overwrite_output() |
|
|
.run(quiet=True) |
|
|
) |
|
|
else: |
|
|
|
|
|
command = [ |
|
|
'ffmpeg', '-i', raw_video_path, |
|
|
'-c:v', 'libx264', '-crf', '23', '-preset', 'fast', |
|
|
'-c:a', 'aac', '-b:a', '128k', |
|
|
'-y', video_path |
|
|
] |
|
|
subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
|
|
|
|
|
|
if os.path.exists(video_path) and os.path.getsize(video_path) > 0: |
|
|
os.remove(raw_video_path) |
|
|
print(f"视频处理完成: {video_path}") |
|
|
else: |
|
|
|
|
|
shutil.copy(raw_video_path, video_path) |
|
|
print(f"视频处理失败,使用原始视频") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"视频处理失败: {str(e)}") |
|
|
|
|
|
if os.path.exists(raw_video_path): |
|
|
shutil.copy(raw_video_path, video_path) |
|
|
|
|
|
|
|
|
cumulative_img_path = self._generate_cumulative_footprint_image(cluster_heatmaps, min_x, min_y, width, height, key_footprints) |
|
|
|
|
|
print(f"足印步行热图视频已保存: {video_path}") |
|
|
print(f"足印步行热图序列已保存: {timeline_dir}") |
|
|
|
|
|
|
|
|
video_base64 = "" |
|
|
with open(video_path, 'rb') as f: |
|
|
video_base64 = base64.b64encode(f.read()).decode('utf-8') |
|
|
|
|
|
final_img_base64 = "" |
|
|
if os.path.exists(cumulative_img_path): |
|
|
with open(cumulative_img_path, 'rb') as f: |
|
|
final_img_base64 = base64.b64encode(f.read()).decode('utf-8') |
|
|
|
|
|
|
|
|
return { |
|
|
'video_base64': video_base64, |
|
|
'final_image_base64': final_img_base64 |
|
|
} |
|
|
|
|
|
def _generate_cumulative_footprint_image(self, cluster_heatmaps, min_x, min_y, width, height, key_footprints): |
|
|
"""生成包含所有足印的累积热图图像""" |
|
|
import numpy as np |
|
|
import cv2 |
|
|
import os |
|
|
import matplotlib.pyplot as plt |
|
|
from matplotlib.patches import Patch |
|
|
|
|
|
|
|
|
cumulative_dir = f'{self.result_dir}/plots/footprint_cumulative' |
|
|
os.makedirs(cumulative_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
canvas = np.zeros((height, width, 3), dtype=np.uint8) |
|
|
canvas.fill(255) |
|
|
|
|
|
|
|
|
for footprint in key_footprints: |
|
|
cluster_id = footprint['cluster_id'] |
|
|
if cluster_id in cluster_heatmaps: |
|
|
heatmap_data = cluster_heatmaps[cluster_id] |
|
|
heatmap = heatmap_data['heatmap'] |
|
|
|
|
|
|
|
|
x = int(footprint['position']['x'] - min_x) |
|
|
y = int(footprint['position']['y'] - min_y) |
|
|
w = int(footprint['position']['width']) |
|
|
h = int(footprint['position']['height']) |
|
|
|
|
|
|
|
|
scale_x = w / heatmap.shape[1] |
|
|
scale_y = h / heatmap.shape[0] |
|
|
scale = min(scale_x, scale_y) |
|
|
|
|
|
|
|
|
new_w = int(heatmap.shape[1] * scale) |
|
|
new_h = int(heatmap.shape[0] * scale) |
|
|
|
|
|
if new_w > 0 and new_h > 0: |
|
|
resized_heatmap = cv2.resize(heatmap, (new_w, new_h)) |
|
|
|
|
|
|
|
|
offset_x = (w - new_w) // 2 |
|
|
offset_y = (h - new_h) // 2 |
|
|
|
|
|
|
|
|
paw_type = footprint['paw_type'] |
|
|
simple_id = footprint['simple_id'] |
|
|
text = f"{paw_type} {simple_id}" |
|
|
cv2.putText(canvas, text, (x + w + 5, y + h//2), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1) |
|
|
|
|
|
|
|
|
for i in range(new_h): |
|
|
for j in range(new_w): |
|
|
if 0 <= x + j + offset_x < width and 0 <= y + i + offset_y < height: |
|
|
alpha = resized_heatmap[i, j, 3] / 255.0 |
|
|
if alpha > 0: |
|
|
canvas[y + i + offset_y, x + j + offset_x] = \ |
|
|
(1 - alpha) * canvas[y + i + offset_y, x + j + offset_x] + \ |
|
|
alpha * resized_heatmap[i, j, :3] |
|
|
|
|
|
|
|
|
cv2.putText(canvas, "All Gait Cumulative", (width//2 - 150, 30), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
|
|
|
|
|
|
cumulative_path = f'{cumulative_dir}/all_footprints_cumulative.png' |
|
|
cv2.imwrite(cumulative_path, canvas) |
|
|
|
|
|
|
|
|
self._create_footprint_legend(cumulative_dir) |
|
|
|
|
|
print(f"足印累积热图已保存: {cumulative_path}") |
|
|
return cumulative_path |
|
|
|
|
|
def _create_footprint_legend(self, output_dir): |
|
|
"""创建足印类型图例""" |
|
|
import matplotlib.pyplot as plt |
|
|
import numpy as np |
|
|
import os |
|
|
from matplotlib.patches import Patch |
|
|
|
|
|
|
|
|
paw_types = { |
|
|
'LF': {'color': 'red', 'label': 'Left Front'}, |
|
|
'RF': {'color': 'green', 'label': 'Right Front'}, |
|
|
'LH': {'color': 'blue', 'label': 'Left Hind'}, |
|
|
'RH': {'color': 'yellow', 'label': 'Right Hind'} |
|
|
} |
|
|
|
|
|
|
|
|
fig, ax = plt.subplots(figsize=(10, 4)) |
|
|
ax.axis('off') |
|
|
|
|
|
|
|
|
legend_elements = [] |
|
|
for paw_type, info in paw_types.items(): |
|
|
legend_elements.append( |
|
|
Patch(facecolor=info['color'], edgecolor='black', label=f"{paw_type} - {info['label']}") |
|
|
) |
|
|
|
|
|
|
|
|
ax.legend(handles=legend_elements, loc='center', fontsize=14, frameon=True, |
|
|
title='Footprint Type Legend', title_fontsize=16) |
|
|
|
|
|
|
|
|
plt.title('Heatmap colors indicate pressure intensity, border colors indicate footprint type', fontsize=14) |
|
|
plt.tight_layout() |
|
|
|
|
|
|
|
|
legend_path = f"{output_dir}/footprint_legend.png" |
|
|
plt.savefig(legend_path, dpi=200, bbox_inches='tight') |
|
|
plt.close() |
|
|
|
|
|
def analyze_angular_velocity(self): |
|
|
"""分析并生成触地时间-角速度序列图 |
|
|
|
|
|
根据足印数据和关键点数据,计算身体角速度并与触地时间关联 |
|
|
|
|
|
Returns: |
|
|
dict: 包含统计数据和图表路径 |
|
|
""" |
|
|
print("\n开始分析触地时间-角速度序列图...") |
|
|
|
|
|
|
|
|
plot_dir = f'{self.result_dir}/plots' |
|
|
os.makedirs(plot_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
keypoints_data = self.record_params.get('bodyKeypoints', []) |
|
|
|
|
|
if len(keypoints_data) == 0: |
|
|
print("无关键点数据,无法生成角速度图") |
|
|
return {} |
|
|
|
|
|
|
|
|
all_footprints = sorted(self.footprint_areas, key=lambda x: x.start_frame) |
|
|
|
|
|
|
|
|
frame_ids = [] |
|
|
angular_velocities = [] |
|
|
body_angles = [] |
|
|
prev_angle = None |
|
|
prev_frame_id = None |
|
|
|
|
|
|
|
|
colors = { |
|
|
'LF': 'red', |
|
|
'RF': 'green', |
|
|
'LH': 'blue', |
|
|
'RH': 'yellow' |
|
|
} |
|
|
|
|
|
for kp_data in keypoints_data: |
|
|
frame_id = kp_data['frame_id'] |
|
|
kps = kp_data['keypoints'] |
|
|
|
|
|
|
|
|
if 'nose' in kps and 'tail_base' in kps: |
|
|
nose = kps['nose'] |
|
|
tail_base = kps['tail_base'] |
|
|
|
|
|
|
|
|
dx = nose[0] - tail_base[0] |
|
|
dy = nose[1] - tail_base[1] |
|
|
angle = np.arctan2(dy, dx) * 180 / np.pi |
|
|
|
|
|
|
|
|
body_angles.append((frame_id, angle)) |
|
|
|
|
|
|
|
|
if prev_angle is not None and prev_frame_id is not None: |
|
|
|
|
|
angle_diff = angle - prev_angle |
|
|
if angle_diff > 180: |
|
|
angle_diff -= 360 |
|
|
elif angle_diff < -180: |
|
|
angle_diff += 360 |
|
|
|
|
|
|
|
|
angular_vel = angle_diff * self.fps / (frame_id - prev_frame_id) |
|
|
|
|
|
frame_ids.append(frame_id) |
|
|
angular_velocities.append(angular_vel) |
|
|
|
|
|
prev_angle = angle |
|
|
prev_frame_id = frame_id |
|
|
|
|
|
|
|
|
plt.figure(figsize=(14, 6)) |
|
|
|
|
|
|
|
|
plt.plot([id / self.fps for id in frame_ids], angular_velocities, 'b-', label='Angular Velocity', alpha=0.7) |
|
|
|
|
|
|
|
|
max_y = max(abs(min(angular_velocities)), abs(max(angular_velocities))) * 1.1 if angular_velocities else 10 |
|
|
for footprint in all_footprints: |
|
|
start_time = footprint.start_frame / self.fps |
|
|
paw_type = footprint.paw_type |
|
|
plt.axvline(x=start_time, color=colors[paw_type], linestyle='--', alpha=0.6) |
|
|
plt.text(start_time, max_y * 0.9, paw_type, rotation=90, color=colors[paw_type], alpha=0.8) |
|
|
|
|
|
|
|
|
plt.xlabel('Time (seconds)') |
|
|
plt.ylabel('Angular Velocity (deg/sec)') |
|
|
plt.title('Footfall Angular Velocity Timeline') |
|
|
plt.grid(True, alpha=0.3) |
|
|
plt.legend() |
|
|
|
|
|
|
|
|
output_path = f'{plot_dir}/angular_velocity_timeline.png' |
|
|
plt.savefig(output_path, dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
|
|
|
print(f"触地时间-角速度序列图已保存: {output_path}") |
|
|
|
|
|
|
|
|
stats = { |
|
|
'max_angular_velocity': max(angular_velocities) if angular_velocities else 0, |
|
|
'min_angular_velocity': min(angular_velocities) if angular_velocities else 0, |
|
|
'mean_angular_velocity': np.mean(angular_velocities) if angular_velocities else 0, |
|
|
'image_path': output_path |
|
|
} |
|
|
|
|
|
return stats |
|
|
|
|
|
def analyze_velocity_timeline(self): |
|
|
"""分析并生成触地时间速度序列图 |
|
|
|
|
|
根据关键点数据计算线性速度并与触地时间关联 |
|
|
|
|
|
Returns: |
|
|
dict: 包含统计数据和图表路径 |
|
|
""" |
|
|
print("\n开始分析触地时间速度序列图...") |
|
|
|
|
|
|
|
|
plot_dir = f'{self.result_dir}/plots' |
|
|
os.makedirs(plot_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
keypoints_data = self.record_params.get('bodyKeypoints', []) |
|
|
|
|
|
if len(keypoints_data) == 0: |
|
|
print("无关键点数据,无法生成速度图") |
|
|
return {} |
|
|
|
|
|
|
|
|
all_footprints = sorted(self.footprint_areas, key=lambda x: x.start_frame) |
|
|
|
|
|
|
|
|
frame_ids = [] |
|
|
velocities = [] |
|
|
prev_pos = None |
|
|
prev_frame_id = None |
|
|
|
|
|
|
|
|
scale_factor = self.record_params.get('actual_length', 20) / self.record_params.get('scale_length', 152) |
|
|
|
|
|
|
|
|
colors = { |
|
|
'LF': 'red', |
|
|
'RF': 'green', |
|
|
'LH': 'blue', |
|
|
'RH': 'yellow' |
|
|
} |
|
|
|
|
|
|
|
|
for kp_data in keypoints_data: |
|
|
frame_id = kp_data['frame_id'] |
|
|
kps = kp_data['keypoints'] |
|
|
|
|
|
|
|
|
if 'mid' in kps: |
|
|
mid = kps['mid'] |
|
|
|
|
|
if prev_pos is not None and prev_frame_id is not None: |
|
|
|
|
|
dx = mid[0] - prev_pos[0] |
|
|
dy = mid[1] - prev_pos[1] |
|
|
distance = np.sqrt(dx**2 + dy**2) |
|
|
|
|
|
|
|
|
distance_cm = distance * scale_factor |
|
|
|
|
|
|
|
|
dt = (frame_id - prev_frame_id) / self.fps |
|
|
|
|
|
|
|
|
if dt > 0: |
|
|
velocity = distance_cm / dt |
|
|
frame_ids.append(frame_id) |
|
|
velocities.append(velocity) |
|
|
|
|
|
prev_pos = mid |
|
|
prev_frame_id = frame_id |
|
|
|
|
|
|
|
|
velocities_smooth = velocities.copy() if velocities else [] |
|
|
if len(velocities) > 11: |
|
|
try: |
|
|
velocities_smooth = savgol_filter(velocities, 11, 3) |
|
|
except Exception as e: |
|
|
print(f"平滑处理失败: {str(e)}") |
|
|
|
|
|
|
|
|
plt.figure(figsize=(14, 6)) |
|
|
|
|
|
|
|
|
if len(velocities_smooth) > 0: |
|
|
plt.plot([id / self.fps for id in frame_ids], velocities_smooth, 'b-', label='Velocity', alpha=0.7) |
|
|
|
|
|
|
|
|
max_y = float(np.max(velocities_smooth)) * 1.1 |
|
|
for footprint in all_footprints: |
|
|
start_time = footprint.start_frame / self.fps |
|
|
paw_type = footprint.paw_type |
|
|
plt.axvline(x=start_time, color=colors[paw_type], linestyle='--', alpha=0.6) |
|
|
plt.text(start_time, max_y * 0.9, paw_type, rotation=90, color=colors[paw_type], alpha=0.8) |
|
|
else: |
|
|
print("没有有效的速度数据") |
|
|
|
|
|
|
|
|
plt.xlabel('Time (seconds)') |
|
|
plt.ylabel('Velocity (cm/sec)') |
|
|
plt.title('Footfall Velocity Timeline') |
|
|
plt.grid(True, alpha=0.3) |
|
|
plt.legend() |
|
|
|
|
|
|
|
|
output_path = f'{plot_dir}/velocity_timeline.png' |
|
|
plt.savefig(output_path, dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
|
|
|
print(f"触地时间速度序列图已保存: {output_path}") |
|
|
|
|
|
|
|
|
stats = {} |
|
|
if len(velocities_smooth) > 0: |
|
|
stats = { |
|
|
'max_velocity': float(np.max(velocities_smooth)), |
|
|
'min_velocity': float(np.min(velocities_smooth)), |
|
|
'mean_velocity': float(np.mean(velocities_smooth)), |
|
|
'image_path': output_path |
|
|
} |
|
|
|
|
|
return stats |
|
|
|
|
|
def analyze_tail_lateral_movement(self): |
|
|
"""分析并生成尾根点侧向移动相位图 |
|
|
|
|
|
根据关键点数据分析尾根点的侧向移动并与脚步相位关联 |
|
|
|
|
|
Returns: |
|
|
dict: 包含统计数据和图表路径 |
|
|
""" |
|
|
print("\n开始分析尾根点侧向移动相位图...") |
|
|
|
|
|
|
|
|
plot_dir = f'{self.result_dir}/plots' |
|
|
os.makedirs(plot_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
keypoints_data = self.record_params.get('bodyKeypoints', []) |
|
|
|
|
|
if len(keypoints_data) == 0: |
|
|
print("无关键点数据,无法生成尾根点移动图") |
|
|
return {} |
|
|
|
|
|
|
|
|
all_footprints = sorted(self.footprint_areas, key=lambda x: x.start_frame) |
|
|
|
|
|
|
|
|
frame_ids = [] |
|
|
tail_lateral_pos = [] |
|
|
|
|
|
|
|
|
|
|
|
first_nose = None |
|
|
last_nose = None |
|
|
|
|
|
for kp_data in keypoints_data: |
|
|
kps = kp_data['keypoints'] |
|
|
if 'nose' in kps: |
|
|
if first_nose is None: |
|
|
first_nose = kps['nose'] |
|
|
last_nose = kps['nose'] |
|
|
|
|
|
if first_nose is None or last_nose is None: |
|
|
print("无法确定运动方向,无法生成尾根点侧向移动图") |
|
|
return {} |
|
|
|
|
|
|
|
|
direction_vector = [last_nose[0] - first_nose[0], last_nose[1] - first_nose[1]] |
|
|
if direction_vector[0] == 0 and direction_vector[1] == 0: |
|
|
print("无法确定运动方向,无法生成尾根点侧向移动图") |
|
|
return {} |
|
|
|
|
|
|
|
|
direction_mag = np.sqrt(direction_vector[0]**2 + direction_vector[1]**2) |
|
|
direction_unit = [direction_vector[0]/direction_mag, direction_vector[1]/direction_mag] |
|
|
|
|
|
|
|
|
perpendicular_unit = [-direction_unit[1], direction_unit[0]] |
|
|
|
|
|
|
|
|
colors = { |
|
|
'LF': 'red', |
|
|
'RF': 'green', |
|
|
'LH': 'blue', |
|
|
'RH': 'yellow' |
|
|
} |
|
|
|
|
|
|
|
|
for kp_data in keypoints_data: |
|
|
frame_id = kp_data['frame_id'] |
|
|
kps = kp_data['keypoints'] |
|
|
|
|
|
if 'mid' in kps and 'tail_base' in kps: |
|
|
mid = kps['mid'] |
|
|
tail_base = kps['tail_base'] |
|
|
|
|
|
|
|
|
relative_vector = [tail_base[0] - mid[0], tail_base[1] - mid[1]] |
|
|
|
|
|
|
|
|
lateral_projection = relative_vector[0] * perpendicular_unit[0] + relative_vector[1] * perpendicular_unit[1] |
|
|
|
|
|
frame_ids.append(frame_id) |
|
|
tail_lateral_pos.append(lateral_projection) |
|
|
|
|
|
|
|
|
tail_pos_smooth = tail_lateral_pos.copy() if tail_lateral_pos else [] |
|
|
if len(tail_lateral_pos) > 11: |
|
|
try: |
|
|
tail_pos_smooth = savgol_filter(tail_lateral_pos, 11, 3) |
|
|
except Exception as e: |
|
|
print(f"平滑处理失败: {str(e)}") |
|
|
|
|
|
|
|
|
plt.figure(figsize=(14, 6)) |
|
|
|
|
|
|
|
|
if len(tail_pos_smooth) > 0: |
|
|
plt.plot([id / self.fps for id in frame_ids], tail_pos_smooth, 'b-', label='Tail Lateral Position', alpha=0.7) |
|
|
|
|
|
|
|
|
y_min = float(np.min(tail_pos_smooth)) |
|
|
y_max = float(np.max(tail_pos_smooth)) |
|
|
|
|
|
for footprint in all_footprints: |
|
|
start_time = footprint.start_frame / self.fps |
|
|
paw_type = footprint.paw_type |
|
|
plt.axvline(x=start_time, color=colors[paw_type], linestyle='--', alpha=0.6) |
|
|
plt.text(start_time, y_max, paw_type, rotation=90, color=colors[paw_type], alpha=0.8) |
|
|
else: |
|
|
print("没有有效的尾根点数据") |
|
|
|
|
|
|
|
|
plt.xlabel('Time (seconds)') |
|
|
plt.ylabel('Tail Lateral Position (pixels)') |
|
|
plt.title('Tail Lateral Movement Phase') |
|
|
plt.grid(True, alpha=0.3) |
|
|
plt.legend() |
|
|
|
|
|
|
|
|
output_path = f'{plot_dir}/tail_lateral_movement.png' |
|
|
plt.savefig(output_path, dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
|
|
|
print(f"尾根点侧向移动相位图已保存: {output_path}") |
|
|
|
|
|
|
|
|
stats = {} |
|
|
if len(tail_pos_smooth) > 0: |
|
|
stats = { |
|
|
'max_lateral_position': float(np.max(tail_pos_smooth)), |
|
|
'min_lateral_position': float(np.min(tail_pos_smooth)), |
|
|
'mean_lateral_position': float(np.mean(tail_pos_smooth)), |
|
|
'image_path': output_path |
|
|
} |
|
|
|
|
|
return stats |
|
|
|
|
|
def analyze_support_swing_phase(self): |
|
|
"""分析并生成支撑-摇摆相位图 |
|
|
|
|
|
可视化每个爪子的支撑和摇摆相位 |
|
|
|
|
|
Returns: |
|
|
dict: 包含统计数据和图表路径 |
|
|
""" |
|
|
print("\n开始分析支撑-摇摆相位图...") |
|
|
|
|
|
|
|
|
plot_dir = f'{self.result_dir}/plots' |
|
|
os.makedirs(plot_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} |
|
|
for area in self.footprint_areas: |
|
|
paw_groups[area.paw_type].append(area) |
|
|
|
|
|
for paw_type in paw_groups: |
|
|
paw_groups[paw_type].sort(key=lambda x: x.start_frame) |
|
|
|
|
|
|
|
|
colors = { |
|
|
'LF': 'red', |
|
|
'RF': 'green', |
|
|
'LH': 'blue', |
|
|
'RH': 'yellow' |
|
|
} |
|
|
|
|
|
|
|
|
all_footprints = [] |
|
|
for paw_type, footprints in paw_groups.items(): |
|
|
all_footprints.extend(footprints) |
|
|
|
|
|
if not all_footprints: |
|
|
print("无足印数据,无法生成支撑-摇摆相位图") |
|
|
return {} |
|
|
|
|
|
|
|
|
time_min = min(fp.start_frame for fp in all_footprints) / self.fps |
|
|
time_max = max(fp.end_frame for fp in all_footprints) / self.fps |
|
|
|
|
|
|
|
|
time_range = time_max - time_min |
|
|
time_min -= time_range * 0.1 |
|
|
time_max += time_range * 0.1 |
|
|
|
|
|
|
|
|
plt.figure(figsize=(15, 6)) |
|
|
|
|
|
|
|
|
y_positions = {'RF': 4, 'RH': 3, 'LF': 2, 'LH': 1} |
|
|
|
|
|
|
|
|
for paw_type, footprints in paw_groups.items(): |
|
|
y_pos = y_positions[paw_type] |
|
|
|
|
|
for i, fp in enumerate(footprints): |
|
|
|
|
|
stance_start = fp.start_frame / self.fps |
|
|
stance_end = fp.end_frame / self.fps |
|
|
plt.hlines(y=y_pos, xmin=stance_start, xmax=stance_end, |
|
|
linewidth=10, color=colors[paw_type], alpha=0.7, label=paw_type if i==0 else "") |
|
|
|
|
|
|
|
|
plt.text(stance_start, y_pos+0.1, f"{paw_type}", fontsize=8, ha='left') |
|
|
|
|
|
|
|
|
if i < len(footprints) - 1: |
|
|
next_fp = footprints[i+1] |
|
|
swing_start = stance_end |
|
|
swing_end = next_fp.start_frame / self.fps |
|
|
|
|
|
|
|
|
plt.hlines(y=y_pos, xmin=swing_start, xmax=swing_end, |
|
|
linewidth=5, color=colors[paw_type], alpha=0.3, linestyle='--') |
|
|
|
|
|
|
|
|
plt.yticks(list(y_positions.values()), list(y_positions.keys())) |
|
|
plt.xlabel('Time (seconds)') |
|
|
plt.title('Support-Swing Phase Diagram') |
|
|
plt.xlim(time_min, time_max) |
|
|
plt.grid(True, alpha=0.3) |
|
|
|
|
|
|
|
|
from matplotlib.lines import Line2D |
|
|
legend_elements = [ |
|
|
Line2D([0], [0], color='black', linewidth=10, alpha=0.7, label='Support Phase'), |
|
|
Line2D([0], [0], color='black', linewidth=5, alpha=0.3, linestyle='--', label='Swing Phase') |
|
|
] |
|
|
plt.legend(handles=legend_elements, loc='upper right') |
|
|
|
|
|
|
|
|
output_path = f'{plot_dir}/support_swing_phase.png' |
|
|
plt.savefig(output_path, dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
|
|
|
print(f"支撑-摇摆相位图已保存: {output_path}") |
|
|
|
|
|
|
|
|
phase_stats = {} |
|
|
|
|
|
for paw_type, footprints in paw_groups.items(): |
|
|
|
|
|
stance_times = [] |
|
|
swing_times = [] |
|
|
duty_factors = [] |
|
|
|
|
|
for i, fp in enumerate(footprints): |
|
|
stance_time = (fp.end_frame - fp.start_frame) / self.fps |
|
|
stance_times.append(stance_time) |
|
|
|
|
|
|
|
|
if i < len(footprints) - 1: |
|
|
next_fp = footprints[i+1] |
|
|
swing_time = (next_fp.start_frame - fp.end_frame) / self.fps |
|
|
swing_times.append(swing_time) |
|
|
|
|
|
cycle_time = stance_time + swing_time |
|
|
duty_factor = stance_time / cycle_time if cycle_time > 0 else 0 |
|
|
duty_factors.append(duty_factor) |
|
|
|
|
|
|
|
|
phase_stats[paw_type] = { |
|
|
'avg_stance_time': np.mean(stance_times) if stance_times else 0, |
|
|
'avg_swing_time': np.mean(swing_times) if swing_times else 0, |
|
|
'avg_duty_factor': np.mean(duty_factors) if duty_factors else 0, |
|
|
'max_duty_factor': max(duty_factors) if duty_factors else 0, |
|
|
'min_duty_factor': min(duty_factors) if duty_factors else 0 |
|
|
} |
|
|
|
|
|
return { |
|
|
'phase_stats': phase_stats, |
|
|
'image_path': output_path |
|
|
} |
|
|
|
|
|
def analyze_limb_duty_cycle(self): |
|
|
"""分析并生成肢体占空比图 (Duty Cycle) |
|
|
|
|
|
计算并可视化各肢体的占空比(支撑时间/周期时间) |
|
|
|
|
|
Returns: |
|
|
dict: 包含统计数据和图表路径 |
|
|
""" |
|
|
print("\n开始分析肢体占空比图...") |
|
|
|
|
|
|
|
|
plot_dir = f'{self.result_dir}/plots' |
|
|
os.makedirs(plot_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} |
|
|
for area in self.footprint_areas: |
|
|
paw_groups[area.paw_type].append(area) |
|
|
|
|
|
for paw_type in paw_groups: |
|
|
paw_groups[paw_type].sort(key=lambda x: x.start_frame) |
|
|
|
|
|
|
|
|
duty_cycle_data = {} |
|
|
cycle_times = {} |
|
|
|
|
|
for paw_type, footprints in paw_groups.items(): |
|
|
duty_cycles = [] |
|
|
stance_times = [] |
|
|
cycles = [] |
|
|
|
|
|
for i, fp in enumerate(footprints): |
|
|
|
|
|
stance_time = (fp.end_frame - fp.start_frame) / self.fps |
|
|
stance_times.append(stance_time) |
|
|
|
|
|
|
|
|
if i < len(footprints) - 1: |
|
|
next_fp = footprints[i+1] |
|
|
cycle_time = (next_fp.start_frame - fp.start_frame) / self.fps |
|
|
duty_cycle = stance_time / cycle_time if cycle_time > 0 else 0 |
|
|
|
|
|
duty_cycles.append(duty_cycle) |
|
|
cycles.append(cycle_time) |
|
|
|
|
|
duty_cycle_data[paw_type] = duty_cycles |
|
|
cycle_times[paw_type] = cycles |
|
|
|
|
|
|
|
|
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6)) |
|
|
|
|
|
|
|
|
paw_order = ['RF', 'LF', 'RH', 'LH'] |
|
|
box_data = [duty_cycle_data[paw] for paw in paw_order] |
|
|
box_colors = [self.colors[paw] for paw in paw_order] |
|
|
|
|
|
|
|
|
boxplots = ax1.boxplot(box_data, patch_artist=True, labels=paw_order) |
|
|
|
|
|
|
|
|
for patch, color in zip(boxplots['boxes'], box_colors): |
|
|
patch.set_facecolor(color) |
|
|
patch.set_alpha(0.6) |
|
|
|
|
|
|
|
|
for i, (paw, data) in enumerate(zip(paw_order, box_data)): |
|
|
if data: |
|
|
x = np.random.normal(i+1, 0.04, size=len(data)) |
|
|
ax1.scatter(x, data, alpha=0.6, color=self.colors[paw], s=30) |
|
|
|
|
|
ax1.set_title('Limb Duty Cycle Distribution') |
|
|
ax1.set_ylabel('Duty Cycle (Stance/Cycle)') |
|
|
ax1.set_ylim(0, 1) |
|
|
ax1.grid(True, alpha=0.3) |
|
|
|
|
|
|
|
|
means = [np.mean(cycle_times[paw]) if cycle_times[paw] else 0 for paw in paw_order] |
|
|
stds = [np.std(cycle_times[paw]) if cycle_times[paw] else 0 for paw in paw_order] |
|
|
|
|
|
|
|
|
bars = ax2.bar(paw_order, means, color=box_colors, alpha=0.7) |
|
|
ax2.errorbar(paw_order, means, yerr=stds, fmt='none', ecolor='black', capsize=5) |
|
|
|
|
|
|
|
|
for bar, mean in zip(bars, means): |
|
|
height = bar.get_height() |
|
|
ax2.text(bar.get_x() + bar.get_width()/2., height + 0.02, |
|
|
f'{mean:.2f}s', ha='center', va='bottom') |
|
|
|
|
|
ax2.set_title('Average Cycle Time') |
|
|
ax2.set_ylabel('Time (seconds)') |
|
|
ax2.grid(True, alpha=0.3) |
|
|
|
|
|
plt.tight_layout() |
|
|
|
|
|
|
|
|
output_path = f'{plot_dir}/limb_duty_cycle.png' |
|
|
plt.savefig(output_path, dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
|
|
|
print(f"肢体占空比图已保存: {output_path}") |
|
|
|
|
|
|
|
|
stats = {} |
|
|
|
|
|
for paw_type in paw_order: |
|
|
duty_cycles = duty_cycle_data[paw_type] |
|
|
stats[paw_type] = { |
|
|
'mean_duty_cycle': float(np.mean(duty_cycles)) if duty_cycles else 0, |
|
|
'std_duty_cycle': float(np.std(duty_cycles)) if duty_cycles else 0, |
|
|
'max_duty_cycle': float(max(duty_cycles)) if duty_cycles else 0, |
|
|
'min_duty_cycle': float(min(duty_cycles)) if duty_cycles else 0, |
|
|
'mean_cycle_time': float(np.mean(cycle_times[paw_type])) if cycle_times[paw_type] else 0 |
|
|
} |
|
|
|
|
|
return { |
|
|
'duty_cycle_stats': stats, |
|
|
'image_path': output_path |
|
|
} |
|
|
|
|
|
def main(): |
|
|
|
|
|
analyzer = GaitAnalysisReport('data/footprint_fixed_Exp2.json', |
|
|
pre_delay_ms=0, |
|
|
post_delay_ms=0) |
|
|
|
|
|
|
|
|
stance_stats = analyzer.analyze_stance_time() |
|
|
|
|
|
|
|
|
swing_stats = analyzer.analyze_swing_metrics() |
|
|
|
|
|
|
|
|
stride_stats = analyzer.analyze_stride_length() |
|
|
|
|
|
|
|
|
analyzer.analyze_gait_sequence() |
|
|
|
|
|
|
|
|
analyzer.analyze_pressure_timeline() |
|
|
|
|
|
|
|
|
stance_width_stats = analyzer.analyze_stance_width() |
|
|
|
|
|
|
|
|
paw_area_stats = analyzer.analyze_paw_area() |
|
|
|
|
|
|
|
|
print("\n=== 支撑时间统计 (秒) ===") |
|
|
for paw_type, data in stance_stats['stance_time'].items(): |
|
|
print(f"\n{paw_type}:") |
|
|
print(f" 平均值: {data['mean']:.3f} ± {data['std']:.3f}") |
|
|
print(f" 范围: {data['min']:.3f} - {data['max']:.3f}") |
|
|
|
|
|
print("\n=== 摆动时长统计 (秒) ===") |
|
|
for paw_type, data in swing_stats['swing_duration'].items(): |
|
|
print(f"\n{paw_type}:") |
|
|
print(f" 平均值: {data['mean']:.3f} ± {data['std']:.3f}") |
|
|
print(f" 范围: {data['min']:.3f} - {data['max']:.3f}") |
|
|
|
|
|
print("\n=== 摆动百分比统计 (%) ===") |
|
|
for paw_type, data in swing_stats['swing_percentage'].items(): |
|
|
print(f"\n{paw_type}:") |
|
|
print(f" 平均值: {data['mean']:.1f} ± {data['std']:.1f}") |
|
|
print(f" 范围: {data['min']:.1f} - {data['max']:.1f}") |
|
|
|
|
|
print("\n=== 步幅长度统计 (像素) ===") |
|
|
for paw_type, data in stride_stats.items(): |
|
|
print(f"\n{paw_type}:") |
|
|
print(f" 平均值: {data['mean']:.1f} ± {data['std']:.1f}") |
|
|
print(f" 范围: {data['min']:.1f} - {data['max']:.1f}") |
|
|
|
|
|
print("\n=== 触地宽度统计 (像素) ===") |
|
|
for limb_type, data in stance_width_stats.items(): |
|
|
if data: |
|
|
print(f"\n{limb_type}:") |
|
|
print(f" 平均值: {data['mean']:.1f} ± {data['std']:.1f}") |
|
|
print(f" 范围: {data['min']:.1f} - {data['max']:.1f}") |
|
|
|
|
|
print("\n=== 脚爪面积统计 (像素²) ===") |
|
|
for paw_type, data in paw_area_stats.items(): |
|
|
print(f"\n{paw_type}:") |
|
|
print(f" 平均值: {data['mean']:.1f} ± {data['std']:.1f}") |
|
|
print(f" 范围: {data['min']:.1f} - {data['max']:.1f}") |
|
|
|
|
|
|
|
|
detailed_data = analyzer.generate_detailed_table() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |