mice-gait / src /gait_analysis_report.py
Hakureirm's picture
Update src/gait_analysis_report.py
bd47b19 verified
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from dataclasses import dataclass
from typing import List, Dict, Tuple, Optional
from datetime import datetime
import os
import cv2
import base64
from scipy.signal import savgol_filter
import logging
import io
import scipy.ndimage as ndimage
import shutil
import re
import subprocess
import tempfile
try:
import ffmpeg
except ImportError:
print("ffmpeg-python not installed. Using subprocess for video processing.")
@dataclass
class FootprintArea:
"""足印区域数据类"""
area_id: str
paw_type: str
start_frame: int
end_frame: int
position: Dict[str, int]
class GaitAnalysisReport:
def __init__(self, json_path: str, pre_delay_ms: int = 0, post_delay_ms: int = 0, video_path: str = None,
record_params: dict = None):
"""初始化报告生成器
Args:
json_path: 足印数据JSON文件路径
pre_delay_ms: 足印前的延时提取时间(毫秒)
post_delay_ms: 足印后的延时提取时间(毫秒)
video_path: 视频文件路径
record_params: 记录参数,包含实验记录相关信息
"""
self.json_path = json_path
self.pre_delay_ms = pre_delay_ms
self.post_delay_ms = post_delay_ms
self.video_path = video_path
self.record_params = record_params or {}
self.fps = 120 # 视频帧率
self.pre_delay_frames = int(pre_delay_ms / 1000 * self.fps) # 转换为帧数
self.post_delay_frames = int(post_delay_ms / 1000 * self.fps) # 转换为帧数
self.result_dir = self._create_result_dir()
self.footprint_areas = []
# 定义统一的颜色方案
self.colors = {
'RF': '#2196F3', # 蓝色
'RH': '#9C27B0', # 紫色
'LF': '#FFD700', # 黄色
'LH': '#4CAF50' # 绿色
}
self._load_data()
def _create_result_dir(self) -> str:
"""创建结果目录"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
result_dir = f"results/analysis_{timestamp}"
# 创建目录结构
for subdir in ['data', 'plots']:
os.makedirs(f"{result_dir}/{subdir}", exist_ok=True)
return result_dir
def _load_data(self):
"""加载并预处理JSON数据"""
with open(self.json_path, 'r') as f:
data = json.load(f)
# 按 footprintAreaId 分组收集所有足印
area_prints = {} # footprintAreaId -> list of footprints
# 遍历所有帧
for frame in data['frames']:
frame_id = frame['frameId']
for footprint in frame['footprints']:
area_id = footprint['footprintAreaId']
if area_id not in area_prints:
area_prints[area_id] = []
area_prints[area_id].append({
'frame_id': frame_id,
'position': footprint['position'],
'type': footprint['type']
})
# 为每个足印区域计算边界框和时间范围
for area_id, prints in area_prints.items():
# 收集所有坐标
x_coords = []
y_coords = []
frame_ids = []
paw_type = prints[0]['type'] # 取第一个足印的类型
for p in prints:
x_coords.extend([
p['position']['x'],
p['position']['x'] + p['position']['width']
])
y_coords.extend([
p['position']['y'],
p['position']['y'] + p['position']['height']
])
frame_ids.append(p['frame_id'])
# 计算边界框
x_min = min(x_coords)
y_min = min(y_coords)
width = max(x_coords) - x_min
height = max(y_coords) - y_min
# 添加到足印区域列表
self.footprint_areas.append(
FootprintArea(
area_id=area_id,
paw_type=paw_type,
start_frame=min(frame_ids),
end_frame=max(frame_ids),
position={
'x': x_min,
'y': y_min,
'width': width,
'height': height
}
)
)
def analyze_stance_time(self):
"""分析足印支撑时间并生成可视化"""
print("\n开始分析足印支撑时间...")
# 1. 按爪子类型分组并按开始帧排序
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []}
for area in self.footprint_areas:
if area.paw_type in paw_groups:
paw_groups[area.paw_type].append(area)
for paw_type in paw_groups:
paw_groups[paw_type].sort(key=lambda x: x.start_frame)
# 2. 计算支撑时间和周期
stance_times = {paw_type: [] for paw_type in paw_groups}
cycle_times = {paw_type: [] for paw_type in paw_groups}
for paw_type, footprints in paw_groups.items():
for i, footprint in enumerate(footprints):
# 计算支撑时间(帧数转换为秒)
stance_time = (footprint.end_frame - footprint.start_frame) / self.fps
stance_times[paw_type].append(stance_time)
# 计算周期时间(相邻两个同类型足印之间的时间)
if i < len(footprints) - 1:
cycle_time = (footprints[i+1].start_frame - footprint.start_frame) / self.fps
cycle_times[paw_type].append(cycle_time)
# 3. 计算统计数据
stats = {
'stance_time': {
paw_type: {
'mean': np.mean(times),
'std': np.std(times),
'min': np.min(times),
'max': np.max(times)
} for paw_type, times in stance_times.items()
},
'cycle_time': {
paw_type: {
'mean': np.mean(times),
'std': np.std(times),
'min': np.min(times),
'max': np.max(times)
} for paw_type, times in cycle_times.items() if times
}
}
# 4. 绘制支撑时序图
self._plot_stance_timeline(paw_groups)
# 5. 绘制支撑时间统计图
self._plot_stance_stats(stance_times)
# 6. 保存统计数据
self._save_stance_stats(stats)
print("足印支撑时间分析完成!")
return stats
def _get_time_range(self) -> Tuple[float, float]:
"""获取所有足印的时间范围,并添加边距"""
start_frames = [area.start_frame for area in self.footprint_areas]
end_frames = [area.end_frame for area in self.footprint_areas]
# 转换为秒并添加10%的边距
time_min = min(start_frames) / self.fps
time_max = max(end_frames) / self.fps
time_range = time_max - time_min
padding = time_range * 0.1 # 10%的边距
return max(0, time_min - padding), time_max + padding
def _plot_stance_timeline(self, paw_groups: Dict[str, List[FootprintArea]]):
"""绘制支撑时序图"""
plt.figure(figsize=(15, 6))
# 获取时间范围
time_min, time_max = self._get_time_range()
# 设置X轴范围
plt.xlim(time_min, time_max)
# 设置Y轴位置
y_positions = {'RF': 4, 'RH': 3, 'LF': 2, 'LH': 1}
# 使用新的颜色方案绘制所有支撑条
for paw_type, footprints in paw_groups.items():
y = y_positions[paw_type]
for footprint in footprints:
start_time = footprint.start_frame / self.fps
end_time = footprint.end_frame / self.fps
plt.hlines(y=y, xmin=start_time, xmax=end_time,
color=self.colors[paw_type], linewidth=8, alpha=0.6)
plt.yticks(list(y_positions.values()), list(y_positions.keys()))
plt.xlabel('Time (seconds)')
plt.title('Gait Stance Timeline')
plt.grid(True, axis='x', alpha=0.3)
# 保存图片
plt.savefig(f'{self.result_dir}/plots/stance_timeline.png', dpi=300, bbox_inches='tight')
plt.close()
def _plot_stance_stats(self, stance_times: Dict[str, List[float]]):
"""绘制支撑时间统计图"""
plt.figure(figsize=(10, 6))
plt.style.use('seaborn-v0_8-whitegrid')
paw_order = ['RF', 'RH', 'LF', 'LH']
box_plot = plt.boxplot([stance_times[paw] for paw in paw_order],
tick_labels=paw_order,
patch_artist=True,
medianprops={'color': 'black'},
flierprops={'marker': 'o',
'markerfacecolor': 'none',
'markersize': 4})
# 使用新的颜色方案
for patch, paw in zip(box_plot['boxes'], paw_order):
patch.set_facecolor(self.colors[paw])
patch.set_alpha(0.6)
# 设置异常值颜色
for flier in box_plot['fliers']:
flier.set_color(self.colors[paw])
# 添加数据点
for i, paw in enumerate(paw_order, 1):
plt.scatter([i] * len(stance_times[paw]),
stance_times[paw],
color=self.colors[paw],
alpha=0.4,
s=30)
# 设置标题和标签
plt.title('Stance Time Distribution by Paw Type', pad=20)
plt.xlabel('Paw Type', labelpad=10)
plt.ylabel('Stance Time (s)', labelpad=10)
# 添加网格线
plt.grid(True, axis='y', linestyle='--', alpha=0.3)
# 调整布局
plt.tight_layout()
# 保存图片
plt.savefig(f'{self.result_dir}/plots/stance_stats.png',
dpi=300, bbox_inches='tight')
plt.close()
def _save_stance_stats(self, stats: Dict):
"""保存统计数据到CSV"""
# 创建支撑时间数据框
stance_df = pd.DataFrame.from_dict(
{(i, j): stats['stance_time'][i][j]
for i in stats['stance_time'].keys()
for j in stats['stance_time'][i].keys()},
orient='index'
)
# 创建周期时间数据框
cycle_df = pd.DataFrame.from_dict(
{(i, j): stats['cycle_time'][i][j]
for i in stats['cycle_time'].keys()
for j in stats['cycle_time'][i].keys()},
orient='index'
)
# 设置时间相关数据的精度为3位小数
stance_df = stance_df.round(3)
cycle_df = cycle_df.round(3)
# 保存到CSV
stance_df.to_csv(f'{self.result_dir}/data/stance_time_stats.csv')
cycle_df.to_csv(f'{self.result_dir}/data/cycle_time_stats.csv')
def analyze_gait_sequence(self):
"""分析步态序列并绘制示意图"""
print("\n开始分析步态序列...")
# 定义步态模式
GAIT_PATTERNS = {
'LH-RF-RH-LF': 'Aa',
'LH-LF-RH-RF': 'Ab',
'LH-RF-LF-RH': 'Ca',
'LH-RH-LF-RF': 'Cb',
'LH-RH-RF-LF': 'Ra',
'LH-LF-RF-RH': 'Rb'
}
# 1. 收集所有足印的开始时间并按时间排序
all_footprints = []
for area in self.footprint_areas:
all_footprints.append({
'paw_type': area.paw_type,
'start_time': area.start_frame / self.fps,
'color': self.colors[area.paw_type]
})
# 按时间排序
all_footprints.sort(key=lambda x: x['start_time'])
# 2. 识别步序模式
sequences = []
current_seq = []
sequence_labels = [] # 存储每个序列的标签和时间点
for fp in all_footprints:
if not current_seq and fp['paw_type'] == 'LH':
current_seq = [fp]
elif current_seq:
current_seq.append(fp)
if len(current_seq) == 4:
# 获取步序模式
pattern = '-'.join([p['paw_type'] for p in current_seq])
if pattern in GAIT_PATTERNS:
sequences.append(current_seq)
sequence_labels.append((
current_seq[0]['start_time'], # LH 时间点
GAIT_PATTERNS[pattern] # 步序类型
))
current_seq = []
# 3. 绘制步态序列图
plt.figure(figsize=(15, 4))
# 获取时间范围
time_min, time_max = self._get_time_range()
plt.xlim(time_min, time_max)
# 设置Y轴位置
y_positions = {'RF': 4, 'RH': 3, 'LF': 2, 'LH': 1}
# 绘制所有足印点和连线
for i, footprint in enumerate(all_footprints):
y = y_positions[footprint['paw_type']]
x = footprint['start_time']
# 绘制圆点
plt.scatter(x, y, c=footprint['color'], s=100, alpha=0.6,
marker='o', label=footprint['paw_type'] if i < 4 else "")
# 添加连线
if i < len(all_footprints) - 1:
next_print = all_footprints[i+1]
plt.plot([x, next_print['start_time']],
[y, y_positions[next_print['paw_type']]],
'--', color='gray', alpha=0.3)
# 在LH点下方添加序列标签
for time, pattern in sequence_labels:
plt.text(time, 0.5, pattern, rotation=45, ha='right', va='top', fontsize=8)
plt.yticks(list(y_positions.values()), list(y_positions.keys()))
plt.xlabel('Time (seconds)')
plt.title('Gait Pattern')
plt.grid(True, alpha=0.3)
# 保存图片
plt.savefig(f'{self.result_dir}/plots/gait_sequence.png',
dpi=300, bbox_inches='tight')
plt.close()
print("步态序列分析完成!")
return sequence_labels
def analyze_pressure_timeline(self):
"""分析并绘制足印压力时序图"""
print("\n开始分析足印压力时序图...")
if not self.video_path:
raise ValueError("未指定视频文件路径")
# 打开视频
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened():
raise ValueError(f"无法打开视频文件: {self.video_path}")
# 设置压力阈值
pressure_threshold = 5
# 1. 按爪子类型分组并按时间排序
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []}
for area in self.footprint_areas:
if area.paw_type in paw_groups:
paw_groups[area.paw_type].append({
'start_frame': max(0, area.start_frame - self.pre_delay_frames),
'end_frame': area.end_frame + self.post_delay_frames,
'original_start': area.start_frame,
'original_end': area.end_frame,
'position': area.position
})
# 2. 创建图表
plt.figure(figsize=(15, 8))
for i, (paw_type, footprints) in enumerate(paw_groups.items()):
plt.subplot(4, 1, i+1)
for footprint in footprints:
# 初始化变量
max_pressures = [0]
avg_pressures = [0]
times = [footprint['start_frame'] / self.fps]
for frame_idx in range(footprint['start_frame'], footprint['end_frame']):
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if not ret:
continue
x = max(0, footprint['position']['x'])
y = max(0, footprint['position']['y'])
w = footprint['position']['width']
h = footprint['position']['height']
roi = frame[y:y+h, x:x+w]
if roi.size > 0:
# 只获取G通道的值
g_values = roi[:, :, 1]
max_pressures.append(float(np.max(g_values)))
avg_pressures.append(float(np.mean(g_values)))
times.append(frame_idx / self.fps)
# 添加结束点
max_pressures.append(0)
avg_pressures.append(0)
times.append(footprint['end_frame'] / self.fps)
# 绘制压力曲线
if times:
plt.plot(times, max_pressures, '--', color=self.colors[paw_type],
alpha=0.8, label='Max Pressure' if i == 0 else '')
plt.plot(times, avg_pressures, '-', color=self.colors[paw_type],
alpha=0.5, label='Avg Pressure' if i == 0 else '')
plt.fill_between(times, 0, avg_pressures, color=self.colors[paw_type], alpha=0.2)
# 添加图例
plt.legend(bbox_to_anchor=(1.05, 4.5), loc='upper left')
# 调整子图间距
plt.tight_layout()
# 保存图片
plt.savefig(f'{self.result_dir}/plots/pressure_timeline.png',
dpi=300, bbox_inches='tight')
plt.close()
cap.release()
print("Pressure timeline analysis completed!")
def analyze_area_timeline(self):
"""分析并绘制足印面积时序图"""
print("\n开始分析足印面积时序图...")
if not self.video_path:
raise ValueError("未指定视频文件路径")
# 打开视频
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened():
raise ValueError(f"无法打开视频文件: {self.video_path}")
# 设置压力阈值(用于计算有效足印面积)
pressure_threshold = 5
# 获取比例尺 (像素到毫米的转换)
# 假设1厘米 = 10毫米
scale_factor = self.record_params.get('actual_length', 20) / self.record_params.get('scale_length', 152) * 10
# 1. 按爪子类型分组并按时间排序
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []}
for area in self.footprint_areas:
if area.paw_type in paw_groups:
paw_groups[area.paw_type].append({
'start_frame': max(0, area.start_frame - self.pre_delay_frames),
'end_frame': area.end_frame + self.post_delay_frames,
'original_start': area.start_frame,
'original_end': area.end_frame,
'position': area.position
})
# 2. 创建图表 - 增加高度以提供更多空间
plt.figure(figsize=(15, 10))
# 创建子图
for i, (paw_type, footprints) in enumerate(paw_groups.items()):
ax = plt.subplot(4, 1, i+1)
for footprint in footprints:
# 初始化变量
areas = [0]
times = [footprint['start_frame'] / self.fps]
for frame_idx in range(footprint['start_frame'], footprint['end_frame']):
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if not ret:
continue
x = max(0, footprint['position']['x'])
y = max(0, footprint['position']['y'])
w = footprint['position']['width']
h = footprint['position']['height']
roi = frame[y:y+h, x:x+w]
if roi.size > 0:
# 只获取G通道的值
g_values = roi[:, :, 1]
# 计算面积(高于阈值的像素数量)
area_pixels = np.sum(g_values > pressure_threshold)
# 转换为平方毫米
area_mm2 = area_pixels * (scale_factor ** 2)
areas.append(float(area_mm2))
times.append(frame_idx / self.fps)
# 添加结束点
areas.append(0)
times.append(footprint['end_frame'] / self.fps)
# 绘制面积曲线
if times:
plt.plot(times, areas, '-', color=self.colors[paw_type],
alpha=0.7, label='Area' if i == 0 else '')
plt.fill_between(times, 0, areas, color=self.colors[paw_type], alpha=0.2)
# 设置Y轴标签和标题
plt.ylabel(f'{paw_type} Area (mm²)')
plt.title(f'{paw_type} Footprint Area Timeline')
plt.grid(True, alpha=0.3)
# 只在最后一个子图上显示X轴标签
if i < 3: # 前三个子图
plt.setp(ax.get_xticklabels(), visible=False)
# 添加图例
plt.legend(bbox_to_anchor=(1.05, 4.5), loc='upper left')
# 添加X轴标签(仅在最后一个子图上)
plt.xlabel('Time (seconds)')
# 调整子图间距,增加垂直间距
plt.subplots_adjust(hspace=0.4)
# 保存图片
plt.savefig(f'{self.result_dir}/plots/area_timeline.png',
dpi=300, bbox_inches='tight')
plt.close()
cap.release()
print("足印面积时序图分析完成!")
def analyze_swing_metrics(self):
"""分析摆动相关指标"""
print("\n开始分析摆动相关指标...")
# 按爪子类型分组
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []}
for area in self.footprint_areas:
if area.paw_type in paw_groups:
paw_groups[area.paw_type].append(area)
# 对每个爪子类型计算指标
swing_metrics = {paw_type: [] for paw_type in paw_groups}
stride_durations = {paw_type: [] for paw_type in paw_groups}
swing_percentages = {paw_type: [] for paw_type in paw_groups}
for paw_type, footprints in paw_groups.items():
# 按时间排序
footprints.sort(key=lambda x: x.start_frame)
for i in range(len(footprints) - 1):
current_print = footprints[i]
next_print = footprints[i + 1]
# 计算摆动时长(从当前足印结束到下一个足印开始)
swing_duration = (next_print.start_frame - current_print.end_frame) / self.fps
swing_metrics[paw_type].append(swing_duration)
# 计算完整步幅时长
stride_duration = (next_print.start_frame - current_print.start_frame) / self.fps
stride_durations[paw_type].append(stride_duration)
# 计算摆动百分比
swing_percentage = (swing_duration / stride_duration) * 100
swing_percentages[paw_type].append(swing_percentage)
# 绘制摆动时长箱线图
self._plot_swing_stats(swing_metrics, "Swing Duration (s)", "swing_duration")
# 绘制摆动百分比箱线图
self._plot_swing_stats(swing_percentages, "Swing Percentage (%)", "swing_percentage")
# 保存统计数据
stats = {
'swing_duration': {
paw_type: {
'mean': np.mean(durations),
'std': np.std(durations),
'min': np.min(durations),
'max': np.max(durations)
} for paw_type, durations in swing_metrics.items() if durations
},
'swing_percentage': {
paw_type: {
'mean': np.mean(percentages),
'std': np.std(percentages),
'min': np.min(percentages),
'max': np.max(percentages)
} for paw_type, percentages in swing_percentages.items() if percentages
},
'stride_duration': {
paw_type: {
'mean': np.mean(durations),
'std': np.std(durations),
'min': np.min(durations),
'max': np.max(durations)
} for paw_type, durations in stride_durations.items() if durations
}
}
# 保存到CSV
self._save_swing_stats(stats)
print("摆动相关指标分析完成!")
return stats
def _plot_swing_stats(self, data: Dict[str, List[float]], ylabel: str, filename: str):
"""绘制摆动相关指标的统计图"""
plt.figure(figsize=(10, 6))
# 设置样式
plt.style.use('seaborn-v0_8-whitegrid')
colors = {'RF': 'red', 'RH': 'blue', 'LF': 'green', 'LH': 'purple'}
# 按照指定顺序处理数据
paw_order = ['RF', 'RH', 'LF', 'LH']
# 创建箱线图
box_plot = plt.boxplot([data[paw] for paw in paw_order],
tick_labels=paw_order,
patch_artist=True,
medianprops={'color': 'black'},
flierprops={'marker': 'o',
'markerfacecolor': 'none',
'markersize': 4})
# 设置箱体颜色和异常值颜色
for patch, paw in zip(box_plot['boxes'], paw_order):
patch.set_facecolor(colors[paw])
patch.set_alpha(0.6)
# 设置对应的异常值颜色
for flier in box_plot['fliers']:
flier.set_color(colors[paw])
# 添加数据点
for i, paw in enumerate(paw_order, 1):
if paw in data:
plt.scatter([i] * len(data[paw]),
data[paw],
color=colors[paw],
alpha=0.4,
s=30)
plt.title(f'{ylabel} Distribution by Paw Type', pad=20)
plt.xlabel('Paw Type', labelpad=10)
plt.ylabel(ylabel, labelpad=10)
plt.grid(True, axis='y', linestyle='--', alpha=0.3)
plt.tight_layout()
plt.savefig(f'{self.result_dir}/plots/{filename}.png',
dpi=300, bbox_inches='tight')
plt.close()
def _save_swing_stats(self, stats: Dict):
"""保存摆动相关指标的统计数据到CSV"""
for metric, data in stats.items():
df = pd.DataFrame.from_dict(
{(i, j): data[i][j]
for i in data.keys()
for j in data[i].keys()},
orient='index'
)
# 根据指标类型设置不同的精度
if metric in ['swing_duration', 'stride_duration']:
df = df.round(3) # 时间相关,3位小数
else:
df = df.round(1) # 其他指标,1位小数
df.to_csv(f'{self.result_dir}/data/{metric}_stats.csv')
def analyze_stride_length(self):
"""分析步幅长度"""
print("\n开始分析步幅长度...")
# 按爪子类型分组
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []}
for area in self.footprint_areas:
if area.paw_type in paw_groups:
paw_groups[area.paw_type].append(area)
# 计算每个爪子类型的步幅长度
stride_lengths = {paw_type: [] for paw_type in paw_groups}
for paw_type, footprints in paw_groups.items():
# 按时间排序
footprints.sort(key=lambda x: x.start_frame)
for i in range(len(footprints) - 1):
current_print = footprints[i]
next_print = footprints[i + 1]
# 计算两个足印中心点之间的距离
current_x = current_print.position['x'] + current_print.position['width'] / 2
next_x = next_print.position['x'] + next_print.position['width'] / 2
stride_length = abs(next_x - current_x) # 使用绝对值
stride_lengths[paw_type].append(stride_length)
# 绘制步幅长度统计图
self._plot_stride_stats(stride_lengths)
# 计算统计数据
stats = {
paw_type: {
'mean': np.mean(lengths),
'std': np.std(lengths),
'min': np.min(lengths),
'max': np.max(lengths)
} for paw_type, lengths in stride_lengths.items() if lengths
}
# 保存统计数据
df = pd.DataFrame.from_dict(
{(i, j): stats[i][j]
for i in stats.keys()
for j in stats[i].keys()},
orient='index'
)
df.to_csv(f'{self.result_dir}/data/stride_length_stats.csv')
print("步幅长度分析完成!")
return stats
def _plot_stride_stats(self, stride_lengths: Dict[str, List[float]]):
"""绘制步幅长度统计图"""
plt.figure(figsize=(10, 6))
plt.style.use('seaborn-v0_8-whitegrid')
colors = {'RF': 'red', 'RH': 'blue', 'LF': 'green', 'LH': 'purple'}
paw_order = ['RF', 'RH', 'LF', 'LH']
box_plot = plt.boxplot([stride_lengths[paw] for paw in paw_order],
tick_labels=paw_order,
patch_artist=True,
medianprops={'color': 'black'},
flierprops={'marker': 'o',
'markerfacecolor': 'none',
'markersize': 4})
# 设置箱体颜色
for patch, paw in zip(box_plot['boxes'], paw_order):
patch.set_facecolor(colors[paw])
patch.set_alpha(0.6)
# 添加数据点
for i, paw in enumerate(paw_order, 1):
plt.scatter([i] * len(stride_lengths[paw]),
stride_lengths[paw],
color=colors[paw],
alpha=0.4,
s=30)
plt.title('Stride Length Distribution by Paw Type', pad=20)
plt.xlabel('Paw Type', labelpad=10)
plt.ylabel('Stride Length (pixels)', labelpad=10)
plt.grid(True, axis='y', linestyle='--', alpha=0.3)
plt.tight_layout()
plt.savefig(f'{self.result_dir}/plots/stride_length.png',
dpi=300, bbox_inches='tight')
plt.close()
def analyze_stance_width(self):
"""分析触地宽度(前后肢之间的左右距离)"""
print("\n开始分析触地宽度...")
# 按爪子类型分组
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []}
for area in self.footprint_areas:
if area.paw_type in paw_groups:
paw_groups[area.paw_type].append(area)
# 计算前肢和后肢的触地宽度
front_widths = [] # RF和LF之间的宽度
hind_widths = [] # RH和LH之间的宽度
# 按时间排序
for paw_type in paw_groups:
paw_groups[paw_type].sort(key=lambda x: x.start_frame)
# 计算每个时间点的触地宽度
for rf in paw_groups['RF']:
rf_center_x = rf.position['x'] + rf.position['width'] / 2
rf_frame = rf.start_frame
# 寻找最近时间的LF足印
closest_lf = min(paw_groups['LF'],
key=lambda x: abs(x.start_frame - rf_frame),
default=None)
if closest_lf and abs(closest_lf.start_frame - rf_frame) < 10: # 限制时间差
lf_center_x = closest_lf.position['x'] + closest_lf.position['width'] / 2
front_width = abs(rf_center_x - lf_center_x)
front_widths.append(front_width)
# 同样计算后肢宽度
for rh in paw_groups['RH']:
rh_center_x = rh.position['x'] + rh.position['width'] / 2
rh_frame = rh.start_frame
closest_lh = min(paw_groups['LH'],
key=lambda x: abs(x.start_frame - rh_frame),
default=None)
if closest_lh and abs(closest_lh.start_frame - rh_frame) < 10:
lh_center_x = closest_lh.position['x'] + closest_lh.position['width'] / 2
hind_width = abs(rh_center_x - lh_center_x)
hind_widths.append(hind_width)
# 计算统计数据
stats = {
'front_width': {
'mean': np.mean(front_widths),
'std': np.std(front_widths),
'min': np.min(front_widths),
'max': np.max(front_widths)
} if front_widths else {},
'hind_width': {
'mean': np.mean(hind_widths),
'std': np.std(hind_widths),
'min': np.min(hind_widths),
'max': np.max(hind_widths)
} if hind_widths else {}
}
# 绘制统计图
self._plot_stance_width_stats(front_widths, hind_widths)
# 保存统计数据
df = pd.DataFrame.from_dict(stats, orient='index')
df.to_csv(f'{self.result_dir}/data/stance_width_stats.csv')
print("触地宽度分析完成!")
return stats
def _plot_stance_width_stats(self, front_widths: List[float], hind_widths: List[float]):
"""绘制触地宽度统计图"""
plt.figure(figsize=(8, 6))
plt.style.use('seaborn-v0_8-whitegrid')
# 使用前肢和后肢的对应颜色
colors = {
'Front': self.colors['RF'], # 使用前肢蓝色
'Hind': self.colors['RH'] # 使用后肢紫色
}
data = [front_widths, hind_widths]
labels = ['Front', 'Hind']
box_plot = plt.boxplot(data,
tick_labels=labels,
patch_artist=True,
medianprops={'color': 'black'},
flierprops={'marker': 'o',
'markerfacecolor': 'none',
'markersize': 4})
# 设置箱体颜色
for patch, label in zip(box_plot['boxes'], labels):
patch.set_facecolor(colors[label])
patch.set_alpha(0.6)
# 添加数据点
for i, (widths, label) in enumerate(zip([front_widths, hind_widths], labels), 1):
plt.scatter([i] * len(widths),
widths,
color=colors[label],
alpha=0.4,
s=30)
plt.title('Stance Width Distribution', pad=20)
plt.xlabel('Limb Type', labelpad=10)
plt.ylabel('Width (pixels)', labelpad=10)
plt.grid(True, axis='y', linestyle='--', alpha=0.3)
plt.tight_layout()
plt.savefig(f'{self.result_dir}/plots/stance_width.png',
dpi=300, bbox_inches='tight')
plt.close()
def analyze_paw_area(self):
"""分析脚爪面积(基于G通道的压力区域)"""
print("\n开始分析脚爪面积...")
# 打开视频
cap = cv2.VideoCapture('exp_videos/Exp2.mp4')
if not cap.isOpened():
raise ValueError("Cannot open video file")
# 按爪子类型分组
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []}
for area in self.footprint_areas:
if area.paw_type in paw_groups:
paw_groups[area.paw_type].append(area)
# 计算每个足印的有效面积
paw_areas = {paw_type: [] for paw_type in paw_groups}
# 设置压力阈值(G通道)
pressure_threshold = 5
for paw_type, footprints in paw_groups.items():
for footprint in footprints:
# 初始化变量
max_area = 0
max_intensity = 0
avg_intensity = 0
# 获取足印最大压力帧
for frame_idx in range(footprint.start_frame, footprint.end_frame):
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if not ret:
continue
# 提取足印区域
x = footprint.position['x']
y = footprint.position['y']
w = footprint.position['width']
h = footprint.position['height']
# 确保坐标在有效范围内
x = max(0, x)
y = max(0, y)
roi = frame[y:y+h, x:x+w]
if roi.size > 0:
# 提取G通道并计算有效面积
g_channel = roi[:, :, 1]
current_area = np.sum(g_channel > pressure_threshold)
max_area = max(max_area, current_area)
max_intensity = max(max_intensity, np.max(g_channel))
avg_intensity = max(avg_intensity, np.mean(g_channel))
if max_area > 0:
paw_areas[paw_type].append(max_area)
# 计算统计数据
stats = {
paw_type: {
'mean': np.mean(areas),
'std': np.std(areas),
'min': np.min(areas),
'max': np.max(areas)
} for paw_type, areas in paw_areas.items() if areas
}
# 绘制统计图
self._plot_paw_area_stats(paw_areas)
# 保存统计数据
df = pd.DataFrame.from_dict(
{(i, j): stats[i][j]
for i in stats.keys()
for j in stats[i].keys()},
orient='index'
)
df.to_csv(f'{self.result_dir}/data/paw_area_stats.csv')
cap.release()
print("脚爪面积分析完成!")
return stats
def _plot_paw_area_stats(self, paw_areas: Dict[str, List[float]]):
"""绘制脚爪面积统计图"""
plt.figure(figsize=(10, 6))
plt.style.use('seaborn-v0_8-whitegrid')
colors = {'RF': 'red', 'RH': 'blue', 'LF': 'green', 'LH': 'purple'}
paw_order = ['RF', 'RH', 'LF', 'LH']
box_plot = plt.boxplot([paw_areas[paw] for paw in paw_order],
tick_labels=paw_order,
patch_artist=True,
medianprops={'color': 'black'},
flierprops={'marker': 'o',
'markerfacecolor': 'none',
'markersize': 4})
# 设置箱体颜色
for patch, paw in zip(box_plot['boxes'], paw_order):
patch.set_facecolor(colors[paw])
patch.set_alpha(0.6)
# 添加数据点
for i, paw in enumerate(paw_order, 1):
plt.scatter([i] * len(paw_areas[paw]),
paw_areas[paw],
color=colors[paw],
alpha=0.4,
s=30)
plt.title('Paw Area Distribution by Paw Type', pad=20)
plt.xlabel('Paw Type', labelpad=10)
plt.ylabel('Area (pixels²)', labelpad=10)
plt.grid(True, axis='y', linestyle='--', alpha=0.3)
plt.tight_layout()
plt.savefig(f'{self.result_dir}/plots/paw_area.png',
dpi=300, bbox_inches='tight')
plt.close()
def generate_detailed_table(self):
print("\n开始生成详细数据表...")
if not self.video_path:
raise ValueError("未指定视频文件路径")
# 添加调试日志
print(f"\n检查关键点数据:")
print(f"record_params keys: {self.record_params.keys()}")
if 'bodyKeypoints' in self.record_params:
print(f"bodyKeypoints 数量: {len(self.record_params['bodyKeypoints'])}")
if self.record_params['bodyKeypoints']:
print(f"第一个关键点数据示例: {self.record_params['bodyKeypoints'][0]}")
else:
print("未找到 bodyKeypoints 数据")
# 创建帧到关键点的映射
keypoints_by_frame = {}
for kp_data in self.record_params.get('bodyKeypoints', []):
frame_id = kp_data['frame_id']
keypoints_by_frame[frame_id] = kp_data['keypoints']
print(f"处理后的关键点帧数: {len(keypoints_by_frame)}")
# 打开视频获取足印图像
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened():
raise ValueError(f"无法打开视频文件: {self.video_path}")
detailed_data = []
movement_data = [] # 新增:存储运动方向数据
# 按时间顺序排序所有足印
sorted_footprints = sorted(self.footprint_areas, key=lambda x: x.start_frame)
for area in sorted_footprints:
# 1. 获取基本信息
footprint_data = {
'paw_type': area.paw_type,
'initial_contact': area.start_frame / self.fps,
'stand_duration': (area.end_frame - area.start_frame) / self.fps,
'max_contact': area.end_frame / self.fps
}
# 2. 提取中间帧的图像
middle_frame = area.start_frame + (area.end_frame - area.start_frame) // 2
cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame)
ret, frame = cap.read()
if ret:
# 获取该帧的关键点
if middle_frame in keypoints_by_frame:
kp = keypoints_by_frame[middle_frame]
nose = kp['nose']
tail_base = kp['tail_base']
# 计算全身图像的边界(增加10%余量)
x_min = min(nose[0], tail_base[0])
x_max = max(nose[0], tail_base[0])
margin = (x_max - x_min) * 0.1
x_min = max(0, int(x_min - margin))
x_max = min(frame.shape[1], int(x_max + margin))
# 计算关键点在截取图片中的相对坐标
nose_relative_x = int(nose[0] - x_min)
nose_relative_y = int(nose[1]) # y坐标保持不变
tail_base_relative_x = int(tail_base[0] - x_min)
tail_base_relative_y = int(tail_base[1]) # y坐标保持不变
# 提取全身图像
full_body_roi = frame[0:frame.shape[0], x_min:x_max]
_, buffer_full = cv2.imencode('.png', full_body_roi)
footprint_data['image_fullbody'] = base64.b64encode(buffer_full).decode('utf-8')
# 添加关键点完整坐标
footprint_data['keypoints'] = {
'nose': {'x': nose_relative_x, 'y': nose_relative_y},
'tail_base': {'x': tail_base_relative_x, 'y': tail_base_relative_y}
}
# 提取足印ROI(原有代码)
x = max(0, area.position['x'])
y = max(0, area.position['y'])
w = area.position['width']
h = area.position['height']
roi = frame[y:y+h, x:x+w]
# 只保留G通道
roi_g = roi.copy()
roi_g[:, :, 0] = 0
roi_g[:, :, 2] = 0
_, buffer = cv2.imencode('.png', roi_g)
footprint_data['image'] = base64.b64encode(buffer).decode('utf-8')
footprint_data['frame_id'] = middle_frame
# 3. 计算足印尺寸
footprint_data['print_length'] = area.position['height'] / 100 # 转换为厘米
footprint_data['print_width'] = area.position['width'] / 100 # 转换为厘米
# 4. 计算压力区域和强度
max_intensity = 0
avg_intensity = 0
area_size = 0
pressure_threshold = 30
for frame_idx in range(area.start_frame, area.end_frame):
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if ret:
roi = frame[y:y+h, x:x+w]
if roi.size > 0:
g_channel = roi[:, :, 1]
current_area = np.sum(g_channel > pressure_threshold)
area_size = max(area_size, current_area)
max_intensity = max(max_intensity, np.max(g_channel))
avg_intensity = max(avg_intensity, np.mean(g_channel))
footprint_data['print_area'] = area_size / 100 # 转换为平方厘米
footprint_data['max_intensity'] = int(max_intensity)
footprint_data['average_intensity'] = round(avg_intensity, 2)
detailed_data.append(footprint_data)
cap.release()
# 计算平滑的运动方向
movement_angles = self._calculate_smooth_movement_angles()
for frame_id, angle in movement_angles.items():
movement_data.append({
'frame_id': frame_id,
'movement_angle': angle
})
# 创建最终的数据结构
output_data = {
"base_footprint_data": detailed_data,
"movement_direction_data": movement_data # 新增:作为并列的数据
}
# 保存为JSON文件
output_path = f'{self.result_dir}/data/detailed_footprint_data.json'
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
print(f"详细数据表已生成:{output_path}")
# 添加验证日志
print(f"\n数据统计:")
print(f"- 足印数据条数: {len(detailed_data)}")
print(f"- 运动方向数据条数: {len(movement_data)}")
print(f"- 包含全身图像的足印数: {sum(1 for d in detailed_data if 'image_fullbody' in d)}")
print(f"- 包含足印图像的足印数: {sum(1 for d in detailed_data if 'image' in d)}")
# 检查第一条数据的结构(去除图像数据以便打印)
if detailed_data:
sample_data = detailed_data[0].copy()
if 'image' in sample_data:
sample_data['image'] = f"[base64 string of length {len(sample_data['image'])}]"
if 'image_fullbody' in sample_data:
sample_data['image_fullbody'] = f"[base64 string of length {len(sample_data['image_fullbody'])}]"
print(f"\n示例数据结构:")
print(json.dumps(sample_data, indent=2))
return output_data
def _calculate_smooth_movement_angles(self):
"""计算平滑的运动方向角度"""
# 收集所有mid点
mid_points = []
frame_ids = []
for kp_data in self.record_params.get('bodyKeypoints', []):
frame_ids.append(kp_data['frame_id'])
mid_points.append(kp_data['keypoints']['mid'])
if not mid_points:
return {}
# 转换为numpy数组
points = np.array(mid_points)
# 使用Savitzky-Golay滤波器平滑轨迹
window_length = min(len(points), 15) # 窗口长度必须是奇数
if window_length % 2 == 0:
window_length -= 1
if window_length >= 3:
smooth_x = savgol_filter(points[:, 0], window_length, 3)
smooth_y = savgol_filter(points[:, 1], window_length, 3)
else:
smooth_x = points[:, 0]
smooth_y = points[:, 1]
# 计算每个点的切线角度
angles = {}
for i in range(len(smooth_x)-1):
dx = smooth_x[i+1] - smooth_x[i]
dy = smooth_y[i+1] - smooth_y[i]
angle = np.degrees(np.arctan2(dy, dx))
angles[frame_ids[i]] = angle
# 处理最后一个点
if frame_ids:
angles[frame_ids[-1]] = angles[frame_ids[-2]] if frame_ids[-2] in angles else 0
return angles
def generate_collection_table(self):
"""生成采集数据表格(Overview)
从record_params获取:实验名称、开始时间、方向等信息
计算得出:运行时长、平均速度、步数、步频
"""
# 获取输入参数(使用默认值避免报错)
direction_map = {
"left_to_right": "L to R",
"right_to_left": "R to L"
}
input_params = {
"name": self.record_params.get("name", "未命名记录"),
"start_time": self.record_params.get("start_time", datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
"direction": direction_map.get(self.record_params.get("direction", "left_to_right"), "L to R"),
"walkway_length": f"{self.record_params.get('actual_length', 20)} cm", # 使用actual_length作为步道长度
"walkway_width": f"{self.record_params.get('crop_height', 152)/8:.1f} cm", # 使用crop_height估算步道宽度
"weight": f"{self.record_params.get('weight', 0)} g" # 添加体重信息
}
# 计算运行时长(从第一个足印到最后一个足印的时间)
sorted_footprints = sorted(self.footprint_areas, key=lambda x: x.start_frame)
if sorted_footprints:
first_frame = sorted_footprints[0].start_frame
last_frame = sorted_footprints[-1].end_frame
duration_seconds = (last_frame - first_frame) / self.fps
# 计算步数
steps_count = len(sorted_footprints)
# 计算步频(步数/时间)
cadence = steps_count / duration_seconds if duration_seconds > 0 else 0
# 计算平均速度(步道长度/时间)
try:
walkway_length_cm = float(input_params["walkway_length"].split()[0]) # 提取数字部分
average_speed = walkway_length_cm / duration_seconds
except (ValueError, IndexError):
average_speed = 0
else:
duration_seconds = 0
steps_count = 0
cadence = 0
average_speed = 0
# 组合所有数据
collection_data = {
# 从输入参数获取的数据
"name": input_params["name"],
"start_time": input_params["start_time"],
"direction": input_params["direction"],
"walkway_length": input_params["walkway_length"],
"walkway_width": input_params["walkway_width"],
"weight": input_params["weight"],
# 计算得出的数据
"run_duration": f"{duration_seconds:.1f} s",
"average_speed": f"{average_speed:.3f} cm/s",
"steps_count": steps_count,
"cadence": f"{cadence:.1f} steps/sec"
}
return collection_data
def generate_paw_statistics(self) -> dict:
"""生成足印统计数据表格,按照不同类别组织数据
Returns:
dict: 包含每个足印的统计数据,按类别分组
"""
print("\n开始生成足印统计数据...")
# 获取比例尺 (pixels to cm)
scale_factor = self.record_params.get('actual_length', 20) / self.record_params.get('scale_length', 152)
# 按爪子类型分组并按时间排序
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []}
for area in self.footprint_areas:
if area.paw_type in paw_groups:
paw_groups[area.paw_type].append(area)
for paw_type in paw_groups:
paw_groups[paw_type].sort(key=lambda x: x.start_frame)
# 打开视频获取压力数据
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened():
raise ValueError("无法打开视频文件")
paw_stats = {}
for paw_type, footprints in paw_groups.items():
paw_stats[paw_type] = []
for i, footprint in enumerate(footprints):
# 基础数据计算
x = footprint.position['x']
y = footprint.position['y']
width = footprint.position['width']
height = footprint.position['height']
# 压力数据计算
max_intensity = 0
mean_intensity = 0
min_intensity = 255
max_area = 0
pressure_threshold = 5
# 遍历足印的所有帧获取压力数据
for frame_idx in range(footprint.start_frame, footprint.end_frame + 1):
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if ret:
roi = frame[y:y+height, x:x+width]
if roi.size > 0:
g_channel = roi[:, :, 1]
current_max = np.max(g_channel)
current_mean = np.mean(g_channel)
# 修改这里:添加安全检查
pressure_pixels = g_channel[g_channel > pressure_threshold]
if pressure_pixels.size > 0:
current_min = np.min(pressure_pixels)
min_intensity = min(min_intensity, current_min)
current_area = np.sum(g_channel > pressure_threshold)
max_intensity = max(max_intensity, current_max)
mean_intensity = max(mean_intensity, current_mean)
max_area = max(max_area, current_area)
# 计算支撑和摆动相关数据
initial_contact = footprint.start_frame / self.fps
max_contact = footprint.end_frame / self.fps
stand_duration = (footprint.end_frame - footprint.start_frame) / self.fps
swing_time = 0
swing_speed = 0
stride_length = 0
if i < len(footprints) - 1:
next_print = footprints[i + 1]
swing_time = (next_print.start_frame - footprint.end_frame) / self.fps
stride_length = abs(
(next_print.position['x'] + next_print.position['width']/2) -
(x + width/2)
) * scale_factor
if swing_time > 0:
swing_speed = stride_length / swing_time
# 组织数据结构
footprint_data = {
# Footprint Basic Data
"basic_data": {
"x_position": round(x * scale_factor, 1),
"y_position": round(y * scale_factor, 1),
"length": round(height * scale_factor, 1),
"width": round(width * scale_factor, 1),
"max_area": round(max_area * (scale_factor ** 2), 1)
},
# Time Points
"time_points": {
"initial_contact": round(initial_contact, 3),
"stand": round(stand_duration, 3),
"max_contact": round(max_contact, 3)
},
# Intensity
"intensity": {
"max_intensity": int(max_intensity),
"min_intensity": int(min_intensity) if min_intensity < 255 else 0,
"mean_intensity": round(mean_intensity, 1),
"max_contact_ratio": round((max_intensity / 255) * 100, 1),
"max_intensity_ratio": round((mean_intensity / max_intensity) * 100, 1) if max_intensity > 0 else 0
},
# Support
"support": {
"support_duration": round(stand_duration, 3),
"support_ratio": round((stand_duration / (stand_duration + swing_time)) * 100, 1) if (stand_duration + swing_time) > 0 else 0,
"initial_bipedal_support": round(initial_contact, 3),
"final_bipedal_support": round(max_contact, 3)
},
# Toe
"toe": {
"toe_spread": round(width * scale_factor, 1),
"intermediate_toe_spread": round(width * scale_factor * 0.5, 1),
"angle_between_motion": 10, # 简化处理,使用固定值
"angle_between_body_axis": 10 # 简化处理,使用固定值
},
# Other
"other": {
"swing": round(swing_time, 3),
"swing_speed": round(swing_speed, 1),
"stride_length": round(stride_length, 1),
"step_cycle": round(stand_duration + swing_time, 3),
"body_speed": round(stride_length / (stand_duration + swing_time), 1) if (stand_duration + swing_time) > 0 else 0
}
}
paw_stats[paw_type].append(footprint_data)
cap.release()
print("足印统计数据生成完成!")
return paw_stats
def generate_step_sequence_table(self) -> dict:
"""生成步序数据统计表格
Returns:
dict: 包含步序统计数据的字典
"""
print("\n开始生成步序统计数据...")
# 定义步态模式
GAIT_PATTERNS = {
'LH-RF-RH-LF': 'Aa',
'LH-LF-RH-RF': 'Ab',
'LH-RF-LF-RH': 'Ca',
'LH-RH-LF-RF': 'Cb',
'LH-RH-RF-LF': 'Ra',
'LH-LF-RF-RH': 'Rb'
}
# 收集所有足印的开始时间并按时间排序
all_footprints = []
for area in self.footprint_areas:
all_footprints.append({
'paw_type': area.paw_type,
'start_time': area.start_frame / self.fps
})
# 按时间排序
all_footprints.sort(key=lambda x: x['start_time'])
# 识别步序模式
sequences = []
current_seq = []
for fp in all_footprints:
if not current_seq and fp['paw_type'] == 'LH':
current_seq = [fp]
elif current_seq:
current_seq.append(fp)
if len(current_seq) == 4:
# 获取步序模式
pattern = '-'.join([p['paw_type'] for p in current_seq])
if pattern in GAIT_PATTERNS:
sequences.append(GAIT_PATTERNS[pattern])
current_seq = []
# 统计各种模式的出现次数
pattern_counts = {
'Aa': 0, 'Ab': 0, 'Ca': 0,
'Cb': 0, 'Ra': 0, 'Rb': 0
}
for seq in sequences:
if seq in pattern_counts:
pattern_counts[seq] += 1
total_sequences = len(sequences)
# 生成统计数据
sequence_stats = {
"summary": {
"total_sequences": total_sequences,
"normal_sequences": sum(pattern_counts.values()), # 正常步序总数
"abnormal_sequences": total_sequences - sum(pattern_counts.values()) # 异常步序数
},
"patterns": [
{
"pattern_code": pattern,
"count": count,
"percentage": round((count / total_sequences * 100), 1) if total_sequences > 0 else 0
}
for pattern, count in pattern_counts.items()
],
"raw_sequence": sequences # 原始步序列表
}
print("步序统计数据生成完成!")
return sequence_stats
def generate_foot_spacing_table(self) -> dict:
"""生成足间距数据统计表格
Returns:
dict: 包含足间距统计数据的字典
"""
print("\n开始生成足间距统计数据...")
# 获取比例尺 (pixels to cm)
scale_factor = self.record_params.get('actual_length', 20) / self.record_params.get('scale_length', 152)
# 按爪子类型分组并按时间排序
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []}
for area in self.footprint_areas:
if area.paw_type in paw_groups:
paw_groups[area.paw_type].append(area)
# 计算每个爪子的中心点坐标
def get_center_point(footprint):
x = footprint.position['x'] + footprint.position['width'] / 2
y = footprint.position['y'] + footprint.position['height'] / 2
return (x, y)
# 计算两点之间的距离
def calculate_distance(p1, p2):
return np.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2) * scale_factor
# 初始化距离统计
spacing_stats = {
"front_paws": {
"count": 0,
"mean_spacing": 0,
"distances": []
},
"hind_paws": {
"count": 0,
"mean_spacing": 0,
"distances": []
},
"paw_pairs": {
"RF": {
"count": 0,
"mean_spacing": 0,
"standard_deviation": 0,
"distances": []
},
"LF": {
"count": 0,
"mean_spacing": 0,
"standard_deviation": 0,
"distances": []
},
"LH": {
"count": 0,
"mean_spacing": 0,
"standard_deviation": 0,
"distances": []
},
"RH": {
"count": 0,
"mean_spacing": 0,
"standard_deviation": 0,
"distances": []
}
}
}
# 计算前爪之间的距离
if len(paw_groups['RF']) > 0 and len(paw_groups['LF']) > 0:
for rf in paw_groups['RF']:
rf_center = get_center_point(rf)
for lf in paw_groups['LF']:
lf_center = get_center_point(lf)
distance = calculate_distance(rf_center, lf_center)
spacing_stats["front_paws"]["distances"].append(distance)
# 计算后爪之间的距离
if len(paw_groups['RH']) > 0 and len(paw_groups['LH']) > 0:
for rh in paw_groups['RH']:
rh_center = get_center_point(rh)
for lh in paw_groups['LH']:
lh_center = get_center_point(lh)
distance = calculate_distance(rh_center, lh_center)
spacing_stats["hind_paws"]["distances"].append(distance)
# 计算各爪子对之间的距离
for paw_type in ['RF', 'LF', 'LH', 'RH']:
if len(paw_groups[paw_type]) > 1:
for i in range(len(paw_groups[paw_type]) - 1):
current = paw_groups[paw_type][i]
next_paw = paw_groups[paw_type][i + 1]
distance = calculate_distance(
get_center_point(current),
get_center_point(next_paw)
)
spacing_stats["paw_pairs"][paw_type]["distances"].append(distance)
# 计算统计数据
# 前爪统计
if spacing_stats["front_paws"]["distances"]:
distances = spacing_stats["front_paws"]["distances"]
spacing_stats["front_paws"].update({
"count": len(distances),
"mean_spacing": round(np.mean(distances), 1)
})
# 后爪统计
if spacing_stats["hind_paws"]["distances"]:
distances = spacing_stats["hind_paws"]["distances"]
spacing_stats["hind_paws"].update({
"count": len(distances),
"mean_spacing": round(np.mean(distances), 1)
})
# 各爪子对统计
for paw_type in ['RF', 'LF', 'LH', 'RH']:
if spacing_stats["paw_pairs"][paw_type]["distances"]:
distances = spacing_stats["paw_pairs"][paw_type]["distances"]
mean_spacing = np.mean(distances)
std_dev = np.std(distances)
spacing_stats["paw_pairs"][paw_type].update({
"count": len(distances),
"mean_spacing": round(mean_spacing, 1),
"standard_deviation": round((std_dev / mean_spacing) * 100, 1) # 转换为百分比
})
print("足间距统计数据生成完成!")
return spacing_stats
def generate_support_table(self) -> dict:
"""生成支撑统计数据(带中英文字段)"""
# 中英对照表
support_type_mapping = {
"diagonal": "对角支撑", # 对侧交叉支撑(如II-DH)
"four": "四肢支撑", # 四肢同时支撑
"girdle": "同源支撑", # 同源双肢支撑(如II-GR)
"lateral": "同侧支撑", # 同侧双肢支撑(如II-LR)
"single": "单肢支撑", # 单肢独立支撑
"standing": "站立支撑", # 静止站立状态
"three": "三肢支撑", # 三肢同时支撑
"zero": "无支撑" # 无任何肢体支撑
}
support_stats = {
"support_sequence": [],
"support_types": {
"diagonal": 0.0,
"four": 0.0,
"girdle": 0.0,
"lateral": 0.0,
"single": 0.0,
"standing": 0.0,
"three": 0.0,
"zero": 0.0
}
}
# 修复时间属性访问方式
all_footprints = sorted(
[{
'start_time': area.start_frame / self.fps, # 正确计算开始时间
'end_time': area.end_frame / self.fps, # 新增结束时间
'duration': (area.end_frame - area.start_frame) / self.fps,
'paw_type': area.paw_type
} for area in self.footprint_areas],
key=lambda x: x['start_time']
)
# 分析每个时间点的支撑情况
for fp in all_footprints:
# 获取当前所有活动爪子
active_paws = [
p['paw_type'] for p in all_footprints
if p['start_time'] <= fp['start_time'] < (p['start_time'] + p['duration'])
]
support_count = len(active_paws)
# 生成SupportFormula (1-4)
support_formula = str(min(support_count, 4)) # 最大显示为4
# 生成FootfallFormula
if support_count == 1:
# 单肢模式 I-{爪子类型}
footfall_code = f"I-{active_paws[0]}"
elif support_count == 2:
# 双肢模式 II-{类型}{组合}
paw1, paw2 = sorted(active_paws)
# 判断支撑类型
if (paw1[0] != paw2[0]) and (paw1[1] != paw2[1]): # 对侧交叉
type_code = "D"
pos_code = paw1[1]+paw2[1] # 取前后位置 F/H
elif paw1[0] == paw2[0]: # 同源支撑
type_code = "G"
pos_code = paw1[0] # 取左右侧 R/L
else: # 同侧支撑
type_code = "L"
pos_code = paw1[0] # 取左右侧 R/L
footfall_code = f"II-{type_code}{pos_code}"
elif support_count == 3:
# 三肢模式 III-{主导爪}
lead_paw = min(active_paws) # 按字母顺序取第一个
footfall_code = f"III-{lead_paw}"
else:
footfall_code = "IV"
# 记录支撑序列
support_stats["support_sequence"].append({
"start_time": round(fp['start_time'], 4),
"duration": round(fp['duration'], 4),
"support_formula": support_formula,
"footfall_formula": footfall_code
})
# 初始化支撑类型
support_type = "unknown"
# 根据footfall_code判断支撑类型
if support_count == 0:
support_type = "zero"
elif support_count == 1:
support_type = "single"
elif support_count == 2:
if footfall_code.startswith("II-D"):
support_type = "diagonal"
elif footfall_code.startswith("II-G"):
support_type = "girdle"
elif footfall_code.startswith("II-L"):
support_type = "lateral"
else:
support_type = "unknown_dual" # 未知双肢类型
elif support_count == 3:
support_type = "three"
elif support_count == 4:
support_type = "four"
# 覆盖判断站立状态
if self._is_standing_state(fp['start_time']):
support_type = "standing"
# 统计持续时间
if support_type in support_stats["support_types"]:
support_stats["support_types"][support_type] += fp['duration']
else:
logging.warning(f"未知支撑类型: {support_type} 时间: {fp['start_time']}")
continue
# 计算百分比(如果需要)
total_duration = sum(support_stats["support_types"].values())
if total_duration > 0:
for k in support_stats["support_types"]:
support_stats["support_types"][k] = round(
(support_stats["support_types"][k] / total_duration) * 100, 4
)
print("足支撑统计数据生成完成!")
return support_stats
def generate_coordination_table(self) -> dict:
"""生成双足协调性数据统计表格
Returns:
dict: 包含双足协调性统计数据的字典
"""
print("\n开始生成双足协调性统计数据...")
# 初始化协调性数据结构
coordination_stats = {
"summary": {
"standard_mean": 0,
"standard_deviation": 0,
"mistakes": 0
},
"pairs": {
"LF-RH": [], # 左前-右后
"LF-LH": [], # 左前-左后
"LF-RF": [], # 左前-右前
"RH-LF": [], # 右后-左前
"RH-LH": [], # 右后-左后
"RH-RF": [] # 右后-右前
},
"coordination_sequence": [] # 详细的协调序列
}
# 按爪子类型分组并按时间排序
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []}
for area in self.footprint_areas:
if area.paw_type in paw_groups:
paw_groups[area.paw_type].append({
'paw_type': area.paw_type,
'start_time': area.start_frame / self.fps,
'duration': (area.end_frame - area.start_frame) / self.fps,
'end_time': area.end_frame / self.fps
})
# 对每个组内的足印按时间排序
for paw_type in paw_groups:
paw_groups[paw_type].sort(key=lambda x: x['start_time'])
# 分析爪子对之间的协调性
def analyze_pair_coordination(anchor_prints, target_prints, pair_type):
pair_stats = []
for anchor in anchor_prints:
# 增加有效性检查
if anchor['duration'] <= 0 or anchor['start_time'] < 0:
continue
# 寻找时间窗口内的目标足印
valid_targets = [
t for t in target_prints
if t['start_time'] >= anchor['start_time'] - 0.5 # 扩大时间窗口
and t['start_time'] <= anchor['end_time'] + 0.5
]
for target in valid_targets:
# 修正计算方式(取消绝对值)
time_diff = target['start_time'] - anchor['start_time']
value = (time_diff / anchor['duration']) * 100
# 修正异常类型判断
if time_diff < 0: # 目标足提前启动
peculiarity = "ANT"
value = abs(value)
elif 0 <= value <= 100:
peculiarity = "NOR"
else: # 超过100%为滞后
peculiarity = "DEL" # 修改异常代码
value = value % 100
# 添加有效性阈值
if abs(time_diff) > 2.0: # 超过2秒视为无效数据
continue
coordination_entry = {
"value": round(value, 4),
"anchor_start": round(anchor['start_time'], 4), # 保持原有名称
"anchor_duration": round(anchor['duration'], 4), # 保持原有名称
"target_start": round(target['start_time'], 4), # 保持原有名称
"peculiarity": peculiarity
}
pair_stats.append(coordination_entry)
break
return pair_stats
# 分析所有配对
pair_configs = [
('LF', 'RH', 'LF-RH'),
('LF', 'LH', 'LF-LH'),
('LF', 'RF', 'LF-RF'),
('RH', 'LF', 'RH-LF'),
('RH', 'LH', 'RH-LH'),
('RH', 'RF', 'RH-RF')
]
all_values = []
mistakes_count = 0
for anchor_type, target_type, pair_name in pair_configs:
pair_stats = analyze_pair_coordination(
paw_groups[anchor_type],
paw_groups[target_type],
pair_name
)
coordination_stats["pairs"][pair_name] = pair_stats
# 收集所有有效值用于计算总体统计
values = [stat["value"] for stat in pair_stats if stat["value"] > 0]
all_values.extend(values)
# 统计异常情况
mistakes_count += sum(1 for stat in pair_stats if stat["peculiarity"] != "NOR")
# 计算总体统计数据
if all_values:
coordination_stats["summary"].update({
"standard_mean": round(np.mean(all_values), 0),
"standard_deviation": round(np.std(all_values), 0),
"mistakes": mistakes_count
})
print("双足协调性统计数据生成完成!")
return coordination_stats
def _is_standing_state(self, current_time: float) -> bool:
"""修复后的站立状态判断"""
# 转换帧号为时间
start_time = current_time - 0.5
hind_prints = [
p for p in self.footprint_areas
if p.paw_type in ('LH', 'RH') and
(p.start_frame / self.fps) <= current_time <= (p.end_frame / self.fps) and
(p.start_frame / self.fps) >= start_time
]
# 持续时间计算修正
return (
len(hind_prints) == 2 and
all((p.end_frame - p.start_frame)/self.fps >= 0.5 for p in hind_prints)
)
def generate_3d_footprint_analysis(self):
"""生成3D足印分析数据"""
print("\n开始生成3D足印可视化...")
# 创建保存目录
footprint_3d_dir = f'{self.result_dir}/plots/footprints_3d'
os.makedirs(footprint_3d_dir, exist_ok=True)
html_dir = f'{self.result_dir}/plots/interactive_3d'
os.makedirs(html_dir, exist_ok=True)
# 生成静态3D图像
self._generate_3d_footprint_plots()
# 生成交互式3D足印可视化
interactive_3d_data = self._generate_interactive_3d_footprints()
# 创建索引页面
self._create_3d_footprint_index(html_dir)
return interactive_3d_data
def _generate_3d_footprint_plots(self):
"""为每个足印区域生成3D图像"""
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import numpy as np
import os
# 创建保存目录
footprint_3d_dir = f'{self.result_dir}/plots/footprints_3d'
os.makedirs(footprint_3d_dir, exist_ok=True)
# 按cluster(area_id)分组足印
cluster_groups = {}
for footprint in self.footprint_areas:
if footprint.area_id not in cluster_groups:
cluster_groups[footprint.area_id] = []
cluster_groups[footprint.area_id].append(footprint)
# 处理每个cluster
for cluster_id, footprints in cluster_groups.items():
# 按时间排序并选择中位数时间点的足印作为关键足印
sorted_footprints = sorted(footprints, key=lambda x: x.start_frame)
key_footprint = sorted_footprints[len(sorted_footprints)//2]
# 打开视频并提取足印图像
if self.video_path:
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened():
print(f"无法打开视频: {self.video_path}")
continue
# 获取足印中间帧
middle_frame = (key_footprint.start_frame + key_footprint.end_frame) // 2
cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame)
ret, frame = cap.read()
if not ret:
print(f"无法读取帧: {middle_frame}")
cap.release()
continue
# 提取足印ROI
x = max(0, key_footprint.position['x'])
y = max(0, key_footprint.position['y'])
w = key_footprint.position['width']
h = key_footprint.position['height']
if x+w > frame.shape[1] or y+h > frame.shape[0]:
print(f"ROI超出图像范围: {x},{y},{w},{h}")
cap.release()
continue
roi = frame[y:y+h, x:x+w]
# 提取绿色通道作为强度图
green_channel = roi[:,:,1]
# 确定方形区域大小(使用最长边)
max_side = max(green_channel.shape[0], green_channel.shape[1])
square_patch = np.zeros((max_side, max_side), dtype=np.uint8)
# 将原图放在方形中心
start_y = (max_side - green_channel.shape[0]) // 2
start_x = (max_side - green_channel.shape[1]) // 2
square_patch[start_y:start_y+green_channel.shape[0],
start_x:start_x+green_channel.shape[1]] = green_channel
# 创建网格
x = np.linspace(0, square_patch.shape[1]-1, square_patch.shape[1])
y = np.linspace(0, square_patch.shape[0]-1, square_patch.shape[0])
X, Y = np.meshgrid(x, y)
# 创建3D图
fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')
# 绘制3D表面
surf = ax.plot_surface(X, Y, square_patch, cmap='viridis')
# 设置标题和标签
paw_type = key_footprint.paw_type
ax.set_title(f'Footprint 3D View - {paw_type} #{cluster_id}')
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Intensity')
# 添加颜色条
fig.colorbar(surf)
# 保存图像
plt.savefig(f'{footprint_3d_dir}/footprint_{cluster_id}_{paw_type}.png',
dpi=300, bbox_inches='tight')
plt.close()
cap.release()
def _generate_interactive_3d_footprints(self):
"""生成交互式3D足印热图可视化并返回base64编码的HTML内容"""
try:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
import os
import cv2
from scipy.interpolate import griddata
except ImportError:
print("请安装必要的库: pip install plotly scipy")
return {}
# 创建保存目录
html_dir = f'{self.result_dir}/plots/interactive_3d'
os.makedirs(html_dir, exist_ok=True)
# 按cluster(area_id)分组足印
cluster_groups = {}
for footprint in self.footprint_areas:
if footprint.area_id not in cluster_groups:
cluster_groups[footprint.area_id] = []
cluster_groups[footprint.area_id].append(footprint)
# 存储HTML内容的字典
html_data = {}
# 处理每个cluster生成单独的HTML
for cluster_id, footprints in cluster_groups.items():
# 按时间排序并选择中位数时间点的足印作为关键足印
sorted_footprints = sorted(footprints, key=lambda x: x.start_frame)
key_footprint = sorted_footprints[len(sorted_footprints)//2]
paw_type = key_footprint.paw_type
html_path = f'{html_dir}/footprint_{cluster_id}_{paw_type}.html'
# 打开视频并提取足印图像
if self.video_path:
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened():
print(f"无法打开视频: {self.video_path}")
continue
# 获取足印中间帧
middle_frame = (key_footprint.start_frame + key_footprint.end_frame) // 2
cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame)
ret, frame = cap.read()
if not ret:
print(f"无法读取帧: {middle_frame}")
cap.release()
continue
# 提取足印ROI
x = max(0, key_footprint.position['x'])
y = max(0, key_footprint.position['y'])
w = key_footprint.position['width']
h = key_footprint.position['height']
if x+w > frame.shape[1] or y+h > frame.shape[0]:
print(f"ROI超出图像范围: {x},{y},{w},{h}")
cap.release()
continue
roi = frame[y:y+h, x:x+w]
# 检查图像格式并处理
if len(roi.shape) == 2 or (len(roi.shape) == 3 and roi.shape[2] == 1):
# 单通道图像(灰度图)- 直接使用
if len(roi.shape) == 3:
gray = roi[:,:,0]
else:
gray = roi
else:
# 多通道图像(BGR或RGB)- 先提取绿色通道
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
# 定义绿色的HSV范围
lower_green = np.array([40, 40, 40])
upper_green = np.array([80, 255, 255])
# 创建掩码
mask = cv2.inRange(hsv, lower_green, upper_green)
# 应用掩码
green_only = cv2.bitwise_and(roi, roi, mask=mask)
# 转换为灰度图
gray = cv2.cvtColor(green_only, cv2.COLOR_BGR2GRAY)
# 应用高斯模糊平滑过渡
smooth_gray = ndimage.gaussian_filter(gray, sigma=1.5)
# 过滤低值(底色)
threshold = np.max(smooth_gray) * 0.1 if np.max(smooth_gray) > 0 else 0
filtered_gray = np.where(smooth_gray < threshold, 0, smooth_gray)
# 确定方形区域大小(使用最长边)
max_side = max(filtered_gray.shape[0], filtered_gray.shape[1])
square_patch = np.zeros((max_side, max_side), dtype=np.uint8)
# 将原图放在方形中心
start_y = (max_side - filtered_gray.shape[0]) // 2
start_x = (max_side - filtered_gray.shape[1]) // 2
square_patch[start_y:start_y+filtered_gray.shape[0],
start_x:start_x+filtered_gray.shape[1]] = filtered_gray
# 增加分辨率(插值平滑)
factor = 2 # 分辨率提高因子
new_size = max_side * factor
# 原始坐标
y_old, x_old = np.mgrid[0:max_side, 0:max_side]
# 新坐标
y_new, x_new = np.mgrid[0:max_side:complex(0, new_size), 0:max_side:complex(0, new_size)]
# 执行插值
z_upscaled = griddata((y_old.flatten(), x_old.flatten()), square_patch.flatten(),
(y_new, x_new), method='cubic', fill_value=0)
# 再次应用平滑
z_upscaled = ndimage.gaussian_filter(z_upscaled, sigma=1)
# 创建交互式3D图
fig = go.Figure(data=[go.Surface(
z=z_upscaled,
colorscale='Jet', # 热图色彩
colorbar=dict(title="强度"),
contours = {
"z": {"show": True, "start": 0, "end": 255, "size": 10}
}
)])
# 设置图表布局和标题
fig.update_layout(
title=f'足印3D交互式热图 - {paw_type} #{cluster_id}',
width=800,
height=800,
scene=dict(
xaxis_title='X',
yaxis_title='Y',
zaxis_title='强度',
aspectratio=dict(x=1, y=1, z=0.5),
camera=dict(
eye=dict(x=1.5, y=1.5, z=0.8)
)
),
margin=dict(l=0, r=0, b=0, t=30)
)
# 保存为HTML文件
fig.write_html(
html_path,
include_plotlyjs='cdn',
full_html=True,
config={
'displayModeBar': True,
'editable': True,
'toImageButtonOptions': {
'format': 'png',
'filename': f'footprint_{cluster_id}_{paw_type}',
'height': 800,
'width': 800,
'scale': 2
}
}
)
# 读取HTML内容并转换为base64
with open(html_path, 'rb') as f:
html_content = f.read()
html_base64 = base64.b64encode(html_content).decode('utf-8')
# 保存到字典
html_data[f"{cluster_id}_{paw_type}"] = {
'html_base64': html_base64,
'filename': f'footprint_{cluster_id}_{paw_type}.html',
'paw_type': paw_type,
'cluster_id': cluster_id
}
cap.release()
print(f"交互式3D足印热图已保存至: {html_path}")
# 创建索引页面
self._create_3d_footprint_index(html_dir)
return html_data
def _create_3d_footprint_index(self, html_dir):
"""创建3D足印可视化的HTML索引页面"""
html_files = [f for f in os.listdir(html_dir) if f.endswith('.html') and f != 'index.html']
if not html_files:
return
index_html = f'{html_dir}/index.html'
# 按足爪类型分组文件
paw_types = {
'leftFront': [],
'rightFront': [],
'leftHind': [],
'rightHind': [],
'unknown': []
}
type_map = {
'LF': 'leftFront',
'RF': 'rightFront',
'LH': 'leftHind',
'RH': 'rightHind'
}
for filename in html_files:
for paw_code, paw_name in type_map.items():
if paw_code in filename:
paw_types[paw_name].append(filename)
break
else:
paw_types['unknown'].append(filename)
# 生成HTML内容
with open(index_html, 'w', encoding='utf-8') as f:
f.write("""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>足印3D可视化索引</title>
<style>
body { font-family: Arial, sans-serif; line-height: 1.6; margin: 20px; }
h1 { color: #333; text-align: center; }
h2 { color: #555; margin-top: 30px; }
.container { display: flex; flex-wrap: wrap; justify-content: center; }
.item { margin: 10px; text-align: center; }
.item a { display: block; padding: 10px; border: 1px solid #ddd; border-radius: 5px;
text-decoration: none; color: #555; transition: all 0.3s; }
.item a:hover { background-color: #f5f5f5; transform: scale(1.05); }
.leftFront a { border-color: #ff9999; }
.rightFront a { border-color: #99ff99; }
.leftHind a { border-color: #9999ff; }
.rightHind a { border-color: #ffff99; }
</style>
</head>
<body>
<h1>足印3D交互式可视化索引</h1>
""")
# 添加每种足爪类型的文件链接
type_labels = {
'leftFront': '左前爪',
'rightFront': '右前爪',
'leftHind': '左后爪',
'rightHind': '右后爪',
'unknown': '未知类型'
}
for paw_type, files in paw_types.items():
if files:
f.write(f'<h2>{type_labels[paw_type]}</h2>\n')
f.write('<div class="container">\n')
for filename in sorted(files):
# 提取cluster_id
parts = filename.split('_')
if len(parts) >= 2:
cluster_id = parts[1]
f.write(f'<div class="item {paw_type}">\n')
f.write(f'<a href="{filename}" target="_blank">足印 #{cluster_id}</a>\n')
f.write('</div>\n')
f.write('</div>\n')
f.write("""
</body>
</html>
""")
print(f"足印3D可视化索引页面已创建: {index_html}")
# 索引页也转为base64
with open(index_html, 'rb') as f:
html_content = f.read()
index_base64 = base64.b64encode(html_content).decode('utf-8')
return index_base64
def generate_footprint_timeline(self):
"""生成足印步行图分析数据"""
print("\n开始生成足印步行图...")
# 创建保存目录
timeline_dir = f'{self.result_dir}/plots/footprint_timeline'
os.makedirs(timeline_dir, exist_ok=True)
videos_dir = f'{self.result_dir}/videos'
os.makedirs(videos_dir, exist_ok=True)
# 生成足印步行热图视频和图片
timeline_data = self._generate_footprint_timeline_video()
return timeline_data
def _generate_footprint_timeline_video(self):
"""生成足印步行热图视频和图片序列,并返回base64编码的数据"""
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap
import os
from matplotlib.patches import Rectangle
# 创建保存目录
timeline_dir = f'{self.result_dir}/plots/footprint_timeline'
os.makedirs(timeline_dir, exist_ok=True)
videos_dir = f'{self.result_dir}/videos'
os.makedirs(videos_dir, exist_ok=True)
# 获取视频信息
video_fps = self.fps
# 按足印类型获取颜色
paw_colors = {
'LF': 'red',
'RF': 'green',
'LH': 'blue',
'RH': 'yellow'
}
# 创建热图色彩映射
cmap_jet = plt.cm.get_cmap('jet')
# 按爪子类型分组足印
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []}
for area in self.footprint_areas:
paw_groups[area.paw_type].append(area)
# 将footprint_areas重新组织为键值对形式,便于后续处理
cluster_to_prints = {}
for footprint in self.footprint_areas:
if footprint.area_id not in cluster_to_prints:
cluster_to_prints[footprint.area_id] = []
# 从area_id中提取数字部分作为简化ID
simple_id = footprint.area_id
if footprint.area_id.startswith("footprintArea_"):
simple_id = footprint.area_id.replace("footprintArea_", "")
cluster_to_prints[footprint.area_id].append({
'cluster_id': footprint.area_id,
'simple_id': simple_id, # 存储简化的ID
'paw_type': footprint.paw_type,
'start_frame': footprint.start_frame,
'end_frame': footprint.end_frame,
'position': footprint.position,
'x': footprint.position['x'] + footprint.position['width']/2, # 中心点x
'y': footprint.position['y'] + footprint.position['height']/2, # 中心点y
'w': footprint.position['width'],
'h': footprint.position['height'],
'frame_id': footprint.start_frame # 使用开始帧作为frame_id
})
# 提取每个cluster的关键足印
key_footprints = []
cluster_heatmaps = {}
for cluster_id, prints in cluster_to_prints.items():
# 按时间排序并选择中位数时间点的足印作为关键足印
sorted_prints = sorted(prints, key=lambda x: x['start_frame'])
key_print = sorted_prints[len(sorted_prints)//2]
key_footprints.append(key_print)
# 获取足印图像并处理
if self.video_path:
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened():
print(f"无法打开视频: {self.video_path}")
continue
# 获取中间帧
middle_frame = (key_print['start_frame'] + key_print['end_frame']) // 2
cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame)
ret, frame = cap.read()
if not ret:
print(f"无法读取帧: {middle_frame}")
cap.release()
continue
# 提取ROI
x = max(0, int(key_print['position']['x']))
y = max(0, int(key_print['position']['y']))
w = int(key_print['position']['width'])
h = int(key_print['position']['height'])
if x+w > frame.shape[1] or y+h > frame.shape[0]:
print(f"ROI超出图像范围: {x},{y},{w},{h}")
cap.release()
continue
patch = frame[y:y+h, x:x+w]
# 检查图像格式并处理
if len(patch.shape) == 2 or (len(patch.shape) == 3 and patch.shape[2] == 1):
# 单通道图像(灰度图)- 直接使用
if len(patch.shape) == 3:
gray = patch[:,:,0] # 如果是形状为(h,w,1),提取为(h,w)
else:
gray = patch
else:
# 多通道图像(BGR或RGB)- 先提取绿色通道
hsv = cv2.cvtColor(patch, cv2.COLOR_BGR2HSV)
# 定义绿色的HSV范围
lower_green = np.array([40, 40, 40])
upper_green = np.array([80, 255, 255])
# 创建掩码
mask = cv2.inRange(hsv, lower_green, upper_green)
# 应用掩码
green_only = cv2.bitwise_and(patch, patch, mask=mask)
# 转换为灰度图
gray = cv2.cvtColor(green_only, cv2.COLOR_BGR2GRAY)
# 应用高斯模糊平滑过渡
smooth_gray = ndimage.gaussian_filter(gray, sigma=1.0)
# 过滤低值(底色)
threshold = np.max(smooth_gray) * 0.1 if np.max(smooth_gray) > 0 else 0
filtered_gray = np.where(smooth_gray < threshold, 0, smooth_gray)
# 标准化为0-1范围
max_val = np.max(filtered_gray)
if max_val > 0:
norm_gray = filtered_gray / max_val
else:
norm_gray = filtered_gray
# 将灰度图转换为热图
heatmap = cmap_jet(norm_gray)
# 转换为BGR格式(OpenCV使用BGR)
heatmap_bgr = (heatmap[:,:,:3][:,:,::-1] * 255).astype(np.uint8)
# 添加透明度通道
alpha = np.where(norm_gray > 0, 0.7, 0).reshape(norm_gray.shape[0], norm_gray.shape[1], 1)
heatmap_bgra = np.concatenate([heatmap_bgr, (alpha * 255).astype(np.uint8)], axis=2)
# 保存该cluster的热图
cluster_heatmaps[cluster_id] = {
'heatmap': heatmap_bgra,
'print': key_print
}
cap.release()
# 按时间排序关键足印
key_footprints.sort(key=lambda x: x['start_frame'])
# 获取帧范围
first_frame = min(p['start_frame'] for p in key_footprints)
last_frame = max(p['end_frame'] for p in key_footprints)
# 计算图像边界(只基于关键足印)
min_x = min(p['position']['x'] for p in key_footprints)
max_x = max(p['position']['x'] + p['position']['width'] for p in key_footprints)
min_y = min(p['position']['y'] for p in key_footprints)
max_y = max(p['position']['y'] + p['position']['height'] for p in key_footprints)
# 添加边距
margin = 50
min_x = max(0, min_x - margin)
min_y = max(0, min_y - margin)
max_x += margin
max_y += margin
# 计算图像尺寸
width = int(max_x - min_x)
height = int(max_y - min_y)
# 确保尺寸是偶数(视频编码要求)
width = width + (width % 2)
height = height + (height % 2)
# 初始化视频写入器
raw_video_path = f'{videos_dir}/footprint_timeline_raw.mp4'
video_path = f'{videos_dir}/footprint_timeline.mp4'
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video_writer = cv2.VideoWriter(raw_video_path, fourcc, video_fps, (width, height))
# 按帧ID对足印分组
frame_to_key_prints = {}
for p in key_footprints:
frame_id = p['start_frame']
if frame_id not in frame_to_key_prints:
frame_to_key_prints[frame_id] = []
frame_to_key_prints[frame_id].append(p)
# 保存中间进度的帧
progress_frames = {}
progress_points = [0, 25, 50, 75, 100] # 进度百分比
total_frames = last_frame - first_frame
progress_frame_ids = [first_frame + int(p * total_frames / 100) for p in progress_points]
# 生成每一帧的可视化
for frame_id in range(first_frame, last_frame + 1):
# 创建画布
canvas = np.zeros((height, width, 3), dtype=np.uint8)
canvas.fill(255) # 白色背景
# 绘制已出现的所有关键足印
for f_id in range(first_frame, frame_id + 1):
if f_id in frame_to_key_prints:
for footprint in frame_to_key_prints[f_id]:
cluster_id = footprint['cluster_id']
if cluster_id in cluster_heatmaps:
heatmap_data = cluster_heatmaps[cluster_id]
heatmap = heatmap_data['heatmap']
# 计算在画布上的位置
x = int(footprint['position']['x'] - min_x)
y = int(footprint['position']['y'] - min_y)
w = int(footprint['position']['width'])
h = int(footprint['position']['height'])
# 计算缩放比例
scale_x = w / heatmap.shape[1]
scale_y = h / heatmap.shape[0]
scale = min(scale_x, scale_y)
# 等比例缩放热图
new_w = int(heatmap.shape[1] * scale)
new_h = int(heatmap.shape[0] * scale)
if new_w > 0 and new_h > 0: # 确保尺寸有效
resized_heatmap = cv2.resize(heatmap, (new_w, new_h))
# 计算居中偏移
offset_x = (w - new_w) // 2
offset_y = (h - new_h) // 2
# 绘制足印类型和简化ID
paw_type = footprint['paw_type']
simple_id = footprint['simple_id']
text = f"{paw_type} {simple_id}"
cv2.putText(canvas, text, (x + w + 5, y + h//2),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)
# 添加热图到画布上
# 处理透明度混合
for i in range(new_h):
for j in range(new_w):
if 0 <= x + j + offset_x < width and 0 <= y + i + offset_y < height:
alpha = resized_heatmap[i, j, 3] / 255.0
if alpha > 0:
canvas[y + i + offset_y, x + j + offset_x] = \
(1 - alpha) * canvas[y + i + offset_y, x + j + offset_x] + \
alpha * resized_heatmap[i, j, :3]
# 添加当前帧信息
cv2.putText(canvas, f"Frame: {frame_id}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
# 保存进度图像
if frame_id in progress_frame_ids:
progress = progress_points[progress_frame_ids.index(frame_id)]
progress_img_path = f'{timeline_dir}/timeline_progress_{progress}.png'
cv2.imwrite(progress_img_path, canvas)
progress_frames[progress] = progress_img_path
# 添加到视频
video_writer.write(canvas)
# 释放视频写入器
video_writer.release()
# 使用ffmpeg处理视频,提高兼容性
try:
print(f"使用ffmpeg处理视频以提高兼容性...")
# 优先使用ffmpeg-python库
if 'ffmpeg' in globals():
(
ffmpeg
.input(raw_video_path)
.output(video_path, vcodec='libx264', crf=23, preset='fast', acodec='aac', audio_bitrate='128k')
.overwrite_output()
.run(quiet=True)
)
else:
# 使用subprocess作为备选方案
command = [
'ffmpeg', '-i', raw_video_path,
'-c:v', 'libx264', '-crf', '23', '-preset', 'fast',
'-c:a', 'aac', '-b:a', '128k',
'-y', video_path
]
subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 删除原始视频
if os.path.exists(video_path) and os.path.getsize(video_path) > 0:
os.remove(raw_video_path)
print(f"视频处理完成: {video_path}")
else:
# 如果处理失败,保留原始视频
shutil.copy(raw_video_path, video_path)
print(f"视频处理失败,使用原始视频")
except Exception as e:
print(f"视频处理失败: {str(e)}")
# 如果处理失败,使用原始视频
if os.path.exists(raw_video_path):
shutil.copy(raw_video_path, video_path)
# 生成累积图像 - 包含所有足印的单张图像
cumulative_img_path = self._generate_cumulative_footprint_image(cluster_heatmaps, min_x, min_y, width, height, key_footprints)
print(f"足印步行热图视频已保存: {video_path}")
print(f"足印步行热图序列已保存: {timeline_dir}")
# 读取视频和图像为base64
video_base64 = ""
with open(video_path, 'rb') as f:
video_base64 = base64.b64encode(f.read()).decode('utf-8')
final_img_base64 = ""
if os.path.exists(cumulative_img_path):
with open(cumulative_img_path, 'rb') as f:
final_img_base64 = base64.b64encode(f.read()).decode('utf-8')
# 返回数据
return {
'video_base64': video_base64,
'final_image_base64': final_img_base64
}
def _generate_cumulative_footprint_image(self, cluster_heatmaps, min_x, min_y, width, height, key_footprints):
"""生成包含所有足印的累积热图图像"""
import numpy as np
import cv2
import os
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
# 创建保存目录
cumulative_dir = f'{self.result_dir}/plots/footprint_cumulative'
os.makedirs(cumulative_dir, exist_ok=True)
# 创建画布
canvas = np.zeros((height, width, 3), dtype=np.uint8)
canvas.fill(255) # 白色背景
# 绘制所有关键足印
for footprint in key_footprints:
cluster_id = footprint['cluster_id']
if cluster_id in cluster_heatmaps:
heatmap_data = cluster_heatmaps[cluster_id]
heatmap = heatmap_data['heatmap']
# 计算在画布上的位置
x = int(footprint['position']['x'] - min_x)
y = int(footprint['position']['y'] - min_y)
w = int(footprint['position']['width'])
h = int(footprint['position']['height'])
# 计算缩放比例
scale_x = w / heatmap.shape[1]
scale_y = h / heatmap.shape[0]
scale = min(scale_x, scale_y)
# 等比例缩放热图
new_w = int(heatmap.shape[1] * scale)
new_h = int(heatmap.shape[0] * scale)
if new_w > 0 and new_h > 0: # 确保尺寸有效
resized_heatmap = cv2.resize(heatmap, (new_w, new_h))
# 计算居中偏移
offset_x = (w - new_w) // 2
offset_y = (h - new_h) // 2
# 绘制足印类型和简化ID
paw_type = footprint['paw_type']
simple_id = footprint['simple_id']
text = f"{paw_type} {simple_id}"
cv2.putText(canvas, text, (x + w + 5, y + h//2),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)
# 添加热图到画布上(透明度混合)
for i in range(new_h):
for j in range(new_w):
if 0 <= x + j + offset_x < width and 0 <= y + i + offset_y < height:
alpha = resized_heatmap[i, j, 3] / 255.0
if alpha > 0:
canvas[y + i + offset_y, x + j + offset_x] = \
(1 - alpha) * canvas[y + i + offset_y, x + j + offset_x] + \
alpha * resized_heatmap[i, j, :3]
# 添加标题
cv2.putText(canvas, "All Gait Cumulative", (width//2 - 150, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
# 保存高清图像
cumulative_path = f'{cumulative_dir}/all_footprints_cumulative.png'
cv2.imwrite(cumulative_path, canvas)
# 创建足印图例
self._create_footprint_legend(cumulative_dir)
print(f"足印累积热图已保存: {cumulative_path}")
return cumulative_path
def _create_footprint_legend(self, output_dir):
"""创建足印类型图例"""
import matplotlib.pyplot as plt
import numpy as np
import os
from matplotlib.patches import Patch
# 足印类型和颜色(使用英文标签)
paw_types = {
'LF': {'color': 'red', 'label': 'Left Front'},
'RF': {'color': 'green', 'label': 'Right Front'},
'LH': {'color': 'blue', 'label': 'Left Hind'},
'RH': {'color': 'yellow', 'label': 'Right Hind'}
}
# 创建图例图像
fig, ax = plt.subplots(figsize=(10, 4))
ax.axis('off')
# 添加图例元素
legend_elements = []
for paw_type, info in paw_types.items():
legend_elements.append(
Patch(facecolor=info['color'], edgecolor='black', label=f"{paw_type} - {info['label']}")
)
# 创建图例(使用英文)
ax.legend(handles=legend_elements, loc='center', fontsize=14, frameon=True,
title='Footprint Type Legend', title_fontsize=16)
# 设置标题(使用英文)
plt.title('Heatmap colors indicate pressure intensity, border colors indicate footprint type', fontsize=14)
plt.tight_layout()
# 保存图例
legend_path = f"{output_dir}/footprint_legend.png"
plt.savefig(legend_path, dpi=200, bbox_inches='tight')
plt.close()
def analyze_angular_velocity(self):
"""分析并生成触地时间-角速度序列图
根据足印数据和关键点数据,计算身体角速度并与触地时间关联
Returns:
dict: 包含统计数据和图表路径
"""
print("\n开始分析触地时间-角速度序列图...")
# 创建保存目录
plot_dir = f'{self.result_dir}/plots'
os.makedirs(plot_dir, exist_ok=True)
# 根据实际可用数据,从记录参数中获取关键点数据
keypoints_data = self.record_params.get('bodyKeypoints', [])
if len(keypoints_data) == 0:
print("无关键点数据,无法生成角速度图")
return {}
# 按时间排序足印数据
all_footprints = sorted(self.footprint_areas, key=lambda x: x.start_frame)
# 计算角速度(基于头部和尾部关键点)
frame_ids = []
angular_velocities = []
body_angles = []
prev_angle = None
prev_frame_id = None
# 设置颜色
colors = {
'LF': 'red',
'RF': 'green',
'LH': 'blue',
'RH': 'yellow'
}
for kp_data in keypoints_data:
frame_id = kp_data['frame_id']
kps = kp_data['keypoints']
# 如果有鼻子和尾根点
if 'nose' in kps and 'tail_base' in kps:
nose = kps['nose']
tail_base = kps['tail_base']
# 计算身体角度
dx = nose[0] - tail_base[0]
dy = nose[1] - tail_base[1]
angle = np.arctan2(dy, dx) * 180 / np.pi
# 存储帧号和角度
body_angles.append((frame_id, angle))
# 计算角速度
if prev_angle is not None and prev_frame_id is not None:
# 处理角度环绕(比如从179度到-179度)
angle_diff = angle - prev_angle
if angle_diff > 180:
angle_diff -= 360
elif angle_diff < -180:
angle_diff += 360
# 计算每秒变化的角度
angular_vel = angle_diff * self.fps / (frame_id - prev_frame_id)
frame_ids.append(frame_id)
angular_velocities.append(angular_vel)
prev_angle = angle
prev_frame_id = frame_id
# 创建图形
plt.figure(figsize=(14, 6))
# 绘制角速度
plt.plot([id / self.fps for id in frame_ids], angular_velocities, 'b-', label='Angular Velocity', alpha=0.7)
# 添加足印触地时间线
max_y = max(abs(min(angular_velocities)), abs(max(angular_velocities))) * 1.1 if angular_velocities else 10
for footprint in all_footprints:
start_time = footprint.start_frame / self.fps
paw_type = footprint.paw_type
plt.axvline(x=start_time, color=colors[paw_type], linestyle='--', alpha=0.6)
plt.text(start_time, max_y * 0.9, paw_type, rotation=90, color=colors[paw_type], alpha=0.8)
# 设置图表
plt.xlabel('Time (seconds)')
plt.ylabel('Angular Velocity (deg/sec)')
plt.title('Footfall Angular Velocity Timeline')
plt.grid(True, alpha=0.3)
plt.legend()
# 保存图像
output_path = f'{plot_dir}/angular_velocity_timeline.png'
plt.savefig(output_path, dpi=300, bbox_inches='tight')
plt.close()
print(f"触地时间-角速度序列图已保存: {output_path}")
# 返回统计数据
stats = {
'max_angular_velocity': max(angular_velocities) if angular_velocities else 0,
'min_angular_velocity': min(angular_velocities) if angular_velocities else 0,
'mean_angular_velocity': np.mean(angular_velocities) if angular_velocities else 0,
'image_path': output_path
}
return stats
def analyze_velocity_timeline(self):
"""分析并生成触地时间速度序列图
根据关键点数据计算线性速度并与触地时间关联
Returns:
dict: 包含统计数据和图表路径
"""
print("\n开始分析触地时间速度序列图...")
# 创建保存目录
plot_dir = f'{self.result_dir}/plots'
os.makedirs(plot_dir, exist_ok=True)
# 根据实际可用数据,从记录参数中获取关键点数据
keypoints_data = self.record_params.get('bodyKeypoints', [])
if len(keypoints_data) == 0:
print("无关键点数据,无法生成速度图")
return {}
# 按时间排序足印数据
all_footprints = sorted(self.footprint_areas, key=lambda x: x.start_frame)
# 计算线性速度(基于中心点)
frame_ids = []
velocities = []
prev_pos = None
prev_frame_id = None
# 比例尺:像素到厘米的转换
scale_factor = self.record_params.get('actual_length', 20) / self.record_params.get('scale_length', 152)
# 设置颜色
colors = {
'LF': 'red',
'RF': 'green',
'LH': 'blue',
'RH': 'yellow'
}
# 从关键点数据计算速度
for kp_data in keypoints_data:
frame_id = kp_data['frame_id']
kps = kp_data['keypoints']
# 使用中心点计算速度
if 'mid' in kps:
mid = kps['mid']
if prev_pos is not None and prev_frame_id is not None:
# 计算距离变化(像素)
dx = mid[0] - prev_pos[0]
dy = mid[1] - prev_pos[1]
distance = np.sqrt(dx**2 + dy**2)
# 转换为厘米
distance_cm = distance * scale_factor
# 计算时间变化(秒)
dt = (frame_id - prev_frame_id) / self.fps
# 计算速度(厘米/秒)
if dt > 0:
velocity = distance_cm / dt
frame_ids.append(frame_id)
velocities.append(velocity)
prev_pos = mid
prev_frame_id = frame_id
# 使用Savitzky-Golay滤波器平滑速度数据
velocities_smooth = velocities.copy() if velocities else []
if len(velocities) > 11: # 确保有足够的数据点
try:
velocities_smooth = savgol_filter(velocities, 11, 3)
except Exception as e:
print(f"平滑处理失败: {str(e)}")
# 创建图形
plt.figure(figsize=(14, 6))
# 绘制速度
if len(velocities_smooth) > 0:
plt.plot([id / self.fps for id in frame_ids], velocities_smooth, 'b-', label='Velocity', alpha=0.7)
# 添加足印触地时间线
max_y = float(np.max(velocities_smooth)) * 1.1
for footprint in all_footprints:
start_time = footprint.start_frame / self.fps
paw_type = footprint.paw_type
plt.axvline(x=start_time, color=colors[paw_type], linestyle='--', alpha=0.6)
plt.text(start_time, max_y * 0.9, paw_type, rotation=90, color=colors[paw_type], alpha=0.8)
else:
print("没有有效的速度数据")
# 设置图表
plt.xlabel('Time (seconds)')
plt.ylabel('Velocity (cm/sec)')
plt.title('Footfall Velocity Timeline')
plt.grid(True, alpha=0.3)
plt.legend()
# 保存图像
output_path = f'{plot_dir}/velocity_timeline.png'
plt.savefig(output_path, dpi=300, bbox_inches='tight')
plt.close()
print(f"触地时间速度序列图已保存: {output_path}")
# 返回统计数据
stats = {}
if len(velocities_smooth) > 0:
stats = {
'max_velocity': float(np.max(velocities_smooth)),
'min_velocity': float(np.min(velocities_smooth)),
'mean_velocity': float(np.mean(velocities_smooth)),
'image_path': output_path
}
return stats
def analyze_tail_lateral_movement(self):
"""分析并生成尾根点侧向移动相位图
根据关键点数据分析尾根点的侧向移动并与脚步相位关联
Returns:
dict: 包含统计数据和图表路径
"""
print("\n开始分析尾根点侧向移动相位图...")
# 创建保存目录
plot_dir = f'{self.result_dir}/plots'
os.makedirs(plot_dir, exist_ok=True)
# 根据实际可用数据,从记录参数中获取关键点数据
keypoints_data = self.record_params.get('bodyKeypoints', [])
if len(keypoints_data) == 0:
print("无关键点数据,无法生成尾根点移动图")
return {}
# 按时间排序足印数据
all_footprints = sorted(self.footprint_areas, key=lambda x: x.start_frame)
# 提取尾根点数据
frame_ids = []
tail_lateral_pos = [] # 尾根点侧向位置(相对于运动方向)
# 计算运动方向
# 获取第一个和最后一个鼻子位置,确定整体运动方向
first_nose = None
last_nose = None
for kp_data in keypoints_data:
kps = kp_data['keypoints']
if 'nose' in kps:
if first_nose is None:
first_nose = kps['nose']
last_nose = kps['nose']
if first_nose is None or last_nose is None:
print("无法确定运动方向,无法生成尾根点侧向移动图")
return {}
# 计算整体运动方向
direction_vector = [last_nose[0] - first_nose[0], last_nose[1] - first_nose[1]]
if direction_vector[0] == 0 and direction_vector[1] == 0:
print("无法确定运动方向,无法生成尾根点侧向移动图")
return {}
# 计算运动方向单位向量
direction_mag = np.sqrt(direction_vector[0]**2 + direction_vector[1]**2)
direction_unit = [direction_vector[0]/direction_mag, direction_vector[1]/direction_mag]
# 计算垂直于运动方向的向量
perpendicular_unit = [-direction_unit[1], direction_unit[0]]
# 设置颜色
colors = {
'LF': 'red',
'RF': 'green',
'LH': 'blue',
'RH': 'yellow'
}
# 计算尾根点的侧向位置
for kp_data in keypoints_data:
frame_id = kp_data['frame_id']
kps = kp_data['keypoints']
if 'mid' in kps and 'tail_base' in kps:
mid = kps['mid'] # 身体中心点
tail_base = kps['tail_base'] # 尾根点
# 计算尾根点相对于中心点的向量
relative_vector = [tail_base[0] - mid[0], tail_base[1] - mid[1]]
# 计算侧向投影(点积)
lateral_projection = relative_vector[0] * perpendicular_unit[0] + relative_vector[1] * perpendicular_unit[1]
frame_ids.append(frame_id)
tail_lateral_pos.append(lateral_projection)
# 使用Savitzky-Golay滤波器平滑尾根点位置数据
tail_pos_smooth = tail_lateral_pos.copy() if tail_lateral_pos else []
if len(tail_lateral_pos) > 11: # 确保有足够的数据点
try:
tail_pos_smooth = savgol_filter(tail_lateral_pos, 11, 3)
except Exception as e:
print(f"平滑处理失败: {str(e)}")
# 创建图形
plt.figure(figsize=(14, 6))
# 绘制尾根点侧向位置
if len(tail_pos_smooth) > 0:
plt.plot([id / self.fps for id in frame_ids], tail_pos_smooth, 'b-', label='Tail Lateral Position', alpha=0.7)
# 添加足印触地时间线
y_min = float(np.min(tail_pos_smooth))
y_max = float(np.max(tail_pos_smooth))
for footprint in all_footprints:
start_time = footprint.start_frame / self.fps
paw_type = footprint.paw_type
plt.axvline(x=start_time, color=colors[paw_type], linestyle='--', alpha=0.6)
plt.text(start_time, y_max, paw_type, rotation=90, color=colors[paw_type], alpha=0.8)
else:
print("没有有效的尾根点数据")
# 设置图表
plt.xlabel('Time (seconds)')
plt.ylabel('Tail Lateral Position (pixels)')
plt.title('Tail Lateral Movement Phase')
plt.grid(True, alpha=0.3)
plt.legend()
# 保存图像
output_path = f'{plot_dir}/tail_lateral_movement.png'
plt.savefig(output_path, dpi=300, bbox_inches='tight')
plt.close()
print(f"尾根点侧向移动相位图已保存: {output_path}")
# 返回统计数据
stats = {}
if len(tail_pos_smooth) > 0:
stats = {
'max_lateral_position': float(np.max(tail_pos_smooth)),
'min_lateral_position': float(np.min(tail_pos_smooth)),
'mean_lateral_position': float(np.mean(tail_pos_smooth)),
'image_path': output_path
}
return stats
def analyze_support_swing_phase(self):
"""分析并生成支撑-摇摆相位图
可视化每个爪子的支撑和摇摆相位
Returns:
dict: 包含统计数据和图表路径
"""
print("\n开始分析支撑-摇摆相位图...")
# 创建保存目录
plot_dir = f'{self.result_dir}/plots'
os.makedirs(plot_dir, exist_ok=True)
# 按爪子类型分组并按开始帧排序
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []}
for area in self.footprint_areas:
paw_groups[area.paw_type].append(area)
for paw_type in paw_groups:
paw_groups[paw_type].sort(key=lambda x: x.start_frame)
# 设置颜色
colors = {
'LF': 'red',
'RF': 'green',
'LH': 'blue',
'RH': 'yellow'
}
# 计算整体时间范围
all_footprints = []
for paw_type, footprints in paw_groups.items():
all_footprints.extend(footprints)
if not all_footprints:
print("无足印数据,无法生成支撑-摇摆相位图")
return {}
# 获取时间范围
time_min = min(fp.start_frame for fp in all_footprints) / self.fps
time_max = max(fp.end_frame for fp in all_footprints) / self.fps
# 添加10%的边界
time_range = time_max - time_min
time_min -= time_range * 0.1
time_max += time_range * 0.1
# 创建图形
plt.figure(figsize=(15, 6))
# 设置Y轴位置
y_positions = {'RF': 4, 'RH': 3, 'LF': 2, 'LH': 1}
# 为每个爪子绘制支撑(stance)和摇摆(swing)阶段
for paw_type, footprints in paw_groups.items():
y_pos = y_positions[paw_type]
for i, fp in enumerate(footprints):
# 支撑阶段
stance_start = fp.start_frame / self.fps
stance_end = fp.end_frame / self.fps
plt.hlines(y=y_pos, xmin=stance_start, xmax=stance_end,
linewidth=10, color=colors[paw_type], alpha=0.7, label=paw_type if i==0 else "")
# 添加文本标签
plt.text(stance_start, y_pos+0.1, f"{paw_type}", fontsize=8, ha='left')
# 如果有下一个足印,绘制摇摆阶段
if i < len(footprints) - 1:
next_fp = footprints[i+1]
swing_start = stance_end
swing_end = next_fp.start_frame / self.fps
# 摇摆阶段(用虚线)
plt.hlines(y=y_pos, xmin=swing_start, xmax=swing_end,
linewidth=5, color=colors[paw_type], alpha=0.3, linestyle='--')
# 设置图表属性
plt.yticks(list(y_positions.values()), list(y_positions.keys()))
plt.xlabel('Time (seconds)')
plt.title('Support-Swing Phase Diagram')
plt.xlim(time_min, time_max)
plt.grid(True, alpha=0.3)
# 添加支撑和摇摆的图例
from matplotlib.lines import Line2D
legend_elements = [
Line2D([0], [0], color='black', linewidth=10, alpha=0.7, label='Support Phase'),
Line2D([0], [0], color='black', linewidth=5, alpha=0.3, linestyle='--', label='Swing Phase')
]
plt.legend(handles=legend_elements, loc='upper right')
# 保存图像
output_path = f'{plot_dir}/support_swing_phase.png'
plt.savefig(output_path, dpi=300, bbox_inches='tight')
plt.close()
print(f"支撑-摇摆相位图已保存: {output_path}")
# 返回相位统计数据
phase_stats = {}
for paw_type, footprints in paw_groups.items():
# 计算每个爪子的支撑和摇摆统计数据
stance_times = []
swing_times = []
duty_factors = [] # 占空比 = 支撑时间 / 周期时间
for i, fp in enumerate(footprints):
stance_time = (fp.end_frame - fp.start_frame) / self.fps
stance_times.append(stance_time)
# 如果有下一个足印,计算摇摆时间和占空比
if i < len(footprints) - 1:
next_fp = footprints[i+1]
swing_time = (next_fp.start_frame - fp.end_frame) / self.fps
swing_times.append(swing_time)
cycle_time = stance_time + swing_time
duty_factor = stance_time / cycle_time if cycle_time > 0 else 0
duty_factors.append(duty_factor)
# 存储统计数据
phase_stats[paw_type] = {
'avg_stance_time': np.mean(stance_times) if stance_times else 0,
'avg_swing_time': np.mean(swing_times) if swing_times else 0,
'avg_duty_factor': np.mean(duty_factors) if duty_factors else 0,
'max_duty_factor': max(duty_factors) if duty_factors else 0,
'min_duty_factor': min(duty_factors) if duty_factors else 0
}
return {
'phase_stats': phase_stats,
'image_path': output_path
}
def analyze_limb_duty_cycle(self):
"""分析并生成肢体占空比图 (Duty Cycle)
计算并可视化各肢体的占空比(支撑时间/周期时间)
Returns:
dict: 包含统计数据和图表路径
"""
print("\n开始分析肢体占空比图...")
# 创建保存目录
plot_dir = f'{self.result_dir}/plots'
os.makedirs(plot_dir, exist_ok=True)
# 按爪子类型分组并按开始帧排序
paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []}
for area in self.footprint_areas:
paw_groups[area.paw_type].append(area)
for paw_type in paw_groups:
paw_groups[paw_type].sort(key=lambda x: x.start_frame)
# 计算每个爪子的周期时间和占空比
duty_cycle_data = {}
cycle_times = {}
for paw_type, footprints in paw_groups.items():
duty_cycles = []
stance_times = []
cycles = []
for i, fp in enumerate(footprints):
# 支撑时间(秒)
stance_time = (fp.end_frame - fp.start_frame) / self.fps
stance_times.append(stance_time)
# 如果有下一个足印,计算一个完整周期
if i < len(footprints) - 1:
next_fp = footprints[i+1]
cycle_time = (next_fp.start_frame - fp.start_frame) / self.fps
duty_cycle = stance_time / cycle_time if cycle_time > 0 else 0
duty_cycles.append(duty_cycle)
cycles.append(cycle_time)
duty_cycle_data[paw_type] = duty_cycles
cycle_times[paw_type] = cycles
# 创建图形
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
# 1. 占空比箱线图
paw_order = ['RF', 'LF', 'RH', 'LH']
box_data = [duty_cycle_data[paw] for paw in paw_order]
box_colors = [self.colors[paw] for paw in paw_order]
# 绘制箱线图
boxplots = ax1.boxplot(box_data, patch_artist=True, labels=paw_order)
# 设置箱线图颜色
for patch, color in zip(boxplots['boxes'], box_colors):
patch.set_facecolor(color)
patch.set_alpha(0.6)
# 添加数据点
for i, (paw, data) in enumerate(zip(paw_order, box_data)):
if data: # 确保有数据
x = np.random.normal(i+1, 0.04, size=len(data))
ax1.scatter(x, data, alpha=0.6, color=self.colors[paw], s=30)
ax1.set_title('Limb Duty Cycle Distribution')
ax1.set_ylabel('Duty Cycle (Stance/Cycle)')
ax1.set_ylim(0, 1)
ax1.grid(True, alpha=0.3)
# 2. 周期时间条形图
means = [np.mean(cycle_times[paw]) if cycle_times[paw] else 0 for paw in paw_order]
stds = [np.std(cycle_times[paw]) if cycle_times[paw] else 0 for paw in paw_order]
# 绘制条形图
bars = ax2.bar(paw_order, means, color=box_colors, alpha=0.7)
ax2.errorbar(paw_order, means, yerr=stds, fmt='none', ecolor='black', capsize=5)
# 添加数值标签
for bar, mean in zip(bars, means):
height = bar.get_height()
ax2.text(bar.get_x() + bar.get_width()/2., height + 0.02,
f'{mean:.2f}s', ha='center', va='bottom')
ax2.set_title('Average Cycle Time')
ax2.set_ylabel('Time (seconds)')
ax2.grid(True, alpha=0.3)
plt.tight_layout()
# 保存图像
output_path = f'{plot_dir}/limb_duty_cycle.png'
plt.savefig(output_path, dpi=300, bbox_inches='tight')
plt.close()
print(f"肢体占空比图已保存: {output_path}")
# 计算并返回统计数据
stats = {}
for paw_type in paw_order:
duty_cycles = duty_cycle_data[paw_type]
stats[paw_type] = {
'mean_duty_cycle': float(np.mean(duty_cycles)) if duty_cycles else 0,
'std_duty_cycle': float(np.std(duty_cycles)) if duty_cycles else 0,
'max_duty_cycle': float(max(duty_cycles)) if duty_cycles else 0,
'min_duty_cycle': float(min(duty_cycles)) if duty_cycles else 0,
'mean_cycle_time': float(np.mean(cycle_times[paw_type])) if cycle_times[paw_type] else 0
}
return {
'duty_cycle_stats': stats,
'image_path': output_path
}
def main():
# 创建分析器实例
analyzer = GaitAnalysisReport('data/footprint_fixed_Exp2.json',
pre_delay_ms=0,
post_delay_ms=0)
# 分析支撑时间
stance_stats = analyzer.analyze_stance_time()
# 分析摆动指标
swing_stats = analyzer.analyze_swing_metrics()
# 分析步幅长度
stride_stats = analyzer.analyze_stride_length()
# 分析步态序列
analyzer.analyze_gait_sequence()
# 分析压力时序
analyzer.analyze_pressure_timeline()
# 分析触地宽度
stance_width_stats = analyzer.analyze_stance_width()
# 分析脚爪面积
paw_area_stats = analyzer.analyze_paw_area()
# 打印统计数据
print("\n=== 支撑时间统计 (秒) ===")
for paw_type, data in stance_stats['stance_time'].items():
print(f"\n{paw_type}:")
print(f" 平均值: {data['mean']:.3f} ± {data['std']:.3f}")
print(f" 范围: {data['min']:.3f} - {data['max']:.3f}")
print("\n=== 摆动时长统计 (秒) ===")
for paw_type, data in swing_stats['swing_duration'].items():
print(f"\n{paw_type}:")
print(f" 平均值: {data['mean']:.3f} ± {data['std']:.3f}")
print(f" 范围: {data['min']:.3f} - {data['max']:.3f}")
print("\n=== 摆动百分比统计 (%) ===")
for paw_type, data in swing_stats['swing_percentage'].items():
print(f"\n{paw_type}:")
print(f" 平均值: {data['mean']:.1f} ± {data['std']:.1f}")
print(f" 范围: {data['min']:.1f} - {data['max']:.1f}")
print("\n=== 步幅长度统计 (像素) ===")
for paw_type, data in stride_stats.items():
print(f"\n{paw_type}:")
print(f" 平均值: {data['mean']:.1f} ± {data['std']:.1f}")
print(f" 范围: {data['min']:.1f} - {data['max']:.1f}")
print("\n=== 触地宽度统计 (像素) ===")
for limb_type, data in stance_width_stats.items():
if data: # 确保有数据
print(f"\n{limb_type}:")
print(f" 平均值: {data['mean']:.1f} ± {data['std']:.1f}")
print(f" 范围: {data['min']:.1f} - {data['max']:.1f}")
print("\n=== 脚爪面积统计 (像素²) ===")
for paw_type, data in paw_area_stats.items():
print(f"\n{paw_type}:")
print(f" 平均值: {data['mean']:.1f} ± {data['std']:.1f}")
print(f" 范围: {data['min']:.1f} - {data['max']:.1f}")
# 生成详细数据表
detailed_data = analyzer.generate_detailed_table()
if __name__ == "__main__":
main()