import json import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from dataclasses import dataclass from typing import List, Dict, Tuple, Optional from datetime import datetime import os import cv2 import base64 from scipy.signal import savgol_filter import logging import io import scipy.ndimage as ndimage import shutil import re import subprocess import tempfile try: import ffmpeg except ImportError: print("ffmpeg-python not installed. Using subprocess for video processing.") @dataclass class FootprintArea: """足印区域数据类""" area_id: str paw_type: str start_frame: int end_frame: int position: Dict[str, int] class GaitAnalysisReport: def __init__(self, json_path: str, pre_delay_ms: int = 0, post_delay_ms: int = 0, video_path: str = None, record_params: dict = None): """初始化报告生成器 Args: json_path: 足印数据JSON文件路径 pre_delay_ms: 足印前的延时提取时间(毫秒) post_delay_ms: 足印后的延时提取时间(毫秒) video_path: 视频文件路径 record_params: 记录参数,包含实验记录相关信息 """ self.json_path = json_path self.pre_delay_ms = pre_delay_ms self.post_delay_ms = post_delay_ms self.video_path = video_path self.record_params = record_params or {} self.fps = 120 # 视频帧率 self.pre_delay_frames = int(pre_delay_ms / 1000 * self.fps) # 转换为帧数 self.post_delay_frames = int(post_delay_ms / 1000 * self.fps) # 转换为帧数 self.result_dir = self._create_result_dir() self.footprint_areas = [] # 定义统一的颜色方案 self.colors = { 'RF': '#2196F3', # 蓝色 'RH': '#9C27B0', # 紫色 'LF': '#FFD700', # 黄色 'LH': '#4CAF50' # 绿色 } self._load_data() def _create_result_dir(self) -> str: """创建结果目录""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") result_dir = f"results/analysis_{timestamp}" # 创建目录结构 for subdir in ['data', 'plots']: os.makedirs(f"{result_dir}/{subdir}", exist_ok=True) return result_dir def _load_data(self): """加载并预处理JSON数据""" with open(self.json_path, 'r') as f: data = json.load(f) # 按 footprintAreaId 分组收集所有足印 area_prints = {} # footprintAreaId -> list of footprints # 遍历所有帧 for frame in data['frames']: frame_id = frame['frameId'] for footprint in frame['footprints']: area_id = footprint['footprintAreaId'] if area_id not in area_prints: area_prints[area_id] = [] area_prints[area_id].append({ 'frame_id': frame_id, 'position': footprint['position'], 'type': footprint['type'] }) # 为每个足印区域计算边界框和时间范围 for area_id, prints in area_prints.items(): # 收集所有坐标 x_coords = [] y_coords = [] frame_ids = [] paw_type = prints[0]['type'] # 取第一个足印的类型 for p in prints: x_coords.extend([ p['position']['x'], p['position']['x'] + p['position']['width'] ]) y_coords.extend([ p['position']['y'], p['position']['y'] + p['position']['height'] ]) frame_ids.append(p['frame_id']) # 计算边界框 x_min = min(x_coords) y_min = min(y_coords) width = max(x_coords) - x_min height = max(y_coords) - y_min # 添加到足印区域列表 self.footprint_areas.append( FootprintArea( area_id=area_id, paw_type=paw_type, start_frame=min(frame_ids), end_frame=max(frame_ids), position={ 'x': x_min, 'y': y_min, 'width': width, 'height': height } ) ) def analyze_stance_time(self): """分析足印支撑时间并生成可视化""" print("\n开始分析足印支撑时间...") # 1. 按爪子类型分组并按开始帧排序 paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} for area in self.footprint_areas: if area.paw_type in paw_groups: paw_groups[area.paw_type].append(area) for paw_type in paw_groups: paw_groups[paw_type].sort(key=lambda x: x.start_frame) # 2. 计算支撑时间和周期 stance_times = {paw_type: [] for paw_type in paw_groups} cycle_times = {paw_type: [] for paw_type in paw_groups} for paw_type, footprints in paw_groups.items(): for i, footprint in enumerate(footprints): # 计算支撑时间(帧数转换为秒) stance_time = (footprint.end_frame - footprint.start_frame) / self.fps stance_times[paw_type].append(stance_time) # 计算周期时间(相邻两个同类型足印之间的时间) if i < len(footprints) - 1: cycle_time = (footprints[i+1].start_frame - footprint.start_frame) / self.fps cycle_times[paw_type].append(cycle_time) # 3. 计算统计数据 stats = { 'stance_time': { paw_type: { 'mean': np.mean(times), 'std': np.std(times), 'min': np.min(times), 'max': np.max(times) } for paw_type, times in stance_times.items() }, 'cycle_time': { paw_type: { 'mean': np.mean(times), 'std': np.std(times), 'min': np.min(times), 'max': np.max(times) } for paw_type, times in cycle_times.items() if times } } # 4. 绘制支撑时序图 self._plot_stance_timeline(paw_groups) # 5. 绘制支撑时间统计图 self._plot_stance_stats(stance_times) # 6. 保存统计数据 self._save_stance_stats(stats) print("足印支撑时间分析完成!") return stats def _get_time_range(self) -> Tuple[float, float]: """获取所有足印的时间范围,并添加边距""" start_frames = [area.start_frame for area in self.footprint_areas] end_frames = [area.end_frame for area in self.footprint_areas] # 转换为秒并添加10%的边距 time_min = min(start_frames) / self.fps time_max = max(end_frames) / self.fps time_range = time_max - time_min padding = time_range * 0.1 # 10%的边距 return max(0, time_min - padding), time_max + padding def _plot_stance_timeline(self, paw_groups: Dict[str, List[FootprintArea]]): """绘制支撑时序图""" plt.figure(figsize=(15, 6)) # 获取时间范围 time_min, time_max = self._get_time_range() # 设置X轴范围 plt.xlim(time_min, time_max) # 设置Y轴位置 y_positions = {'RF': 4, 'RH': 3, 'LF': 2, 'LH': 1} # 使用新的颜色方案绘制所有支撑条 for paw_type, footprints in paw_groups.items(): y = y_positions[paw_type] for footprint in footprints: start_time = footprint.start_frame / self.fps end_time = footprint.end_frame / self.fps plt.hlines(y=y, xmin=start_time, xmax=end_time, color=self.colors[paw_type], linewidth=8, alpha=0.6) plt.yticks(list(y_positions.values()), list(y_positions.keys())) plt.xlabel('Time (seconds)') plt.title('Gait Stance Timeline') plt.grid(True, axis='x', alpha=0.3) # 保存图片 plt.savefig(f'{self.result_dir}/plots/stance_timeline.png', dpi=300, bbox_inches='tight') plt.close() def _plot_stance_stats(self, stance_times: Dict[str, List[float]]): """绘制支撑时间统计图""" plt.figure(figsize=(10, 6)) plt.style.use('seaborn-v0_8-whitegrid') paw_order = ['RF', 'RH', 'LF', 'LH'] box_plot = plt.boxplot([stance_times[paw] for paw in paw_order], tick_labels=paw_order, patch_artist=True, medianprops={'color': 'black'}, flierprops={'marker': 'o', 'markerfacecolor': 'none', 'markersize': 4}) # 使用新的颜色方案 for patch, paw in zip(box_plot['boxes'], paw_order): patch.set_facecolor(self.colors[paw]) patch.set_alpha(0.6) # 设置异常值颜色 for flier in box_plot['fliers']: flier.set_color(self.colors[paw]) # 添加数据点 for i, paw in enumerate(paw_order, 1): plt.scatter([i] * len(stance_times[paw]), stance_times[paw], color=self.colors[paw], alpha=0.4, s=30) # 设置标题和标签 plt.title('Stance Time Distribution by Paw Type', pad=20) plt.xlabel('Paw Type', labelpad=10) plt.ylabel('Stance Time (s)', labelpad=10) # 添加网格线 plt.grid(True, axis='y', linestyle='--', alpha=0.3) # 调整布局 plt.tight_layout() # 保存图片 plt.savefig(f'{self.result_dir}/plots/stance_stats.png', dpi=300, bbox_inches='tight') plt.close() def _save_stance_stats(self, stats: Dict): """保存统计数据到CSV""" # 创建支撑时间数据框 stance_df = pd.DataFrame.from_dict( {(i, j): stats['stance_time'][i][j] for i in stats['stance_time'].keys() for j in stats['stance_time'][i].keys()}, orient='index' ) # 创建周期时间数据框 cycle_df = pd.DataFrame.from_dict( {(i, j): stats['cycle_time'][i][j] for i in stats['cycle_time'].keys() for j in stats['cycle_time'][i].keys()}, orient='index' ) # 设置时间相关数据的精度为3位小数 stance_df = stance_df.round(3) cycle_df = cycle_df.round(3) # 保存到CSV stance_df.to_csv(f'{self.result_dir}/data/stance_time_stats.csv') cycle_df.to_csv(f'{self.result_dir}/data/cycle_time_stats.csv') def analyze_gait_sequence(self): """分析步态序列并绘制示意图""" print("\n开始分析步态序列...") # 定义步态模式 GAIT_PATTERNS = { 'LH-RF-RH-LF': 'Aa', 'LH-LF-RH-RF': 'Ab', 'LH-RF-LF-RH': 'Ca', 'LH-RH-LF-RF': 'Cb', 'LH-RH-RF-LF': 'Ra', 'LH-LF-RF-RH': 'Rb' } # 1. 收集所有足印的开始时间并按时间排序 all_footprints = [] for area in self.footprint_areas: all_footprints.append({ 'paw_type': area.paw_type, 'start_time': area.start_frame / self.fps, 'color': self.colors[area.paw_type] }) # 按时间排序 all_footprints.sort(key=lambda x: x['start_time']) # 2. 识别步序模式 sequences = [] current_seq = [] sequence_labels = [] # 存储每个序列的标签和时间点 for fp in all_footprints: if not current_seq and fp['paw_type'] == 'LH': current_seq = [fp] elif current_seq: current_seq.append(fp) if len(current_seq) == 4: # 获取步序模式 pattern = '-'.join([p['paw_type'] for p in current_seq]) if pattern in GAIT_PATTERNS: sequences.append(current_seq) sequence_labels.append(( current_seq[0]['start_time'], # LH 时间点 GAIT_PATTERNS[pattern] # 步序类型 )) current_seq = [] # 3. 绘制步态序列图 plt.figure(figsize=(15, 4)) # 获取时间范围 time_min, time_max = self._get_time_range() plt.xlim(time_min, time_max) # 设置Y轴位置 y_positions = {'RF': 4, 'RH': 3, 'LF': 2, 'LH': 1} # 绘制所有足印点和连线 for i, footprint in enumerate(all_footprints): y = y_positions[footprint['paw_type']] x = footprint['start_time'] # 绘制圆点 plt.scatter(x, y, c=footprint['color'], s=100, alpha=0.6, marker='o', label=footprint['paw_type'] if i < 4 else "") # 添加连线 if i < len(all_footprints) - 1: next_print = all_footprints[i+1] plt.plot([x, next_print['start_time']], [y, y_positions[next_print['paw_type']]], '--', color='gray', alpha=0.3) # 在LH点下方添加序列标签 for time, pattern in sequence_labels: plt.text(time, 0.5, pattern, rotation=45, ha='right', va='top', fontsize=8) plt.yticks(list(y_positions.values()), list(y_positions.keys())) plt.xlabel('Time (seconds)') plt.title('Gait Pattern') plt.grid(True, alpha=0.3) # 保存图片 plt.savefig(f'{self.result_dir}/plots/gait_sequence.png', dpi=300, bbox_inches='tight') plt.close() print("步态序列分析完成!") return sequence_labels def analyze_pressure_timeline(self): """分析并绘制足印压力时序图""" print("\n开始分析足印压力时序图...") if not self.video_path: raise ValueError("未指定视频文件路径") # 打开视频 cap = cv2.VideoCapture(self.video_path) if not cap.isOpened(): raise ValueError(f"无法打开视频文件: {self.video_path}") # 设置压力阈值 pressure_threshold = 5 # 1. 按爪子类型分组并按时间排序 paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} for area in self.footprint_areas: if area.paw_type in paw_groups: paw_groups[area.paw_type].append({ 'start_frame': max(0, area.start_frame - self.pre_delay_frames), 'end_frame': area.end_frame + self.post_delay_frames, 'original_start': area.start_frame, 'original_end': area.end_frame, 'position': area.position }) # 2. 创建图表 plt.figure(figsize=(15, 8)) for i, (paw_type, footprints) in enumerate(paw_groups.items()): plt.subplot(4, 1, i+1) for footprint in footprints: # 初始化变量 max_pressures = [0] avg_pressures = [0] times = [footprint['start_frame'] / self.fps] for frame_idx in range(footprint['start_frame'], footprint['end_frame']): cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) ret, frame = cap.read() if not ret: continue x = max(0, footprint['position']['x']) y = max(0, footprint['position']['y']) w = footprint['position']['width'] h = footprint['position']['height'] roi = frame[y:y+h, x:x+w] if roi.size > 0: # 只获取G通道的值 g_values = roi[:, :, 1] max_pressures.append(float(np.max(g_values))) avg_pressures.append(float(np.mean(g_values))) times.append(frame_idx / self.fps) # 添加结束点 max_pressures.append(0) avg_pressures.append(0) times.append(footprint['end_frame'] / self.fps) # 绘制压力曲线 if times: plt.plot(times, max_pressures, '--', color=self.colors[paw_type], alpha=0.8, label='Max Pressure' if i == 0 else '') plt.plot(times, avg_pressures, '-', color=self.colors[paw_type], alpha=0.5, label='Avg Pressure' if i == 0 else '') plt.fill_between(times, 0, avg_pressures, color=self.colors[paw_type], alpha=0.2) # 添加图例 plt.legend(bbox_to_anchor=(1.05, 4.5), loc='upper left') # 调整子图间距 plt.tight_layout() # 保存图片 plt.savefig(f'{self.result_dir}/plots/pressure_timeline.png', dpi=300, bbox_inches='tight') plt.close() cap.release() print("Pressure timeline analysis completed!") def analyze_area_timeline(self): """分析并绘制足印面积时序图""" print("\n开始分析足印面积时序图...") if not self.video_path: raise ValueError("未指定视频文件路径") # 打开视频 cap = cv2.VideoCapture(self.video_path) if not cap.isOpened(): raise ValueError(f"无法打开视频文件: {self.video_path}") # 设置压力阈值(用于计算有效足印面积) pressure_threshold = 5 # 获取比例尺 (像素到毫米的转换) # 假设1厘米 = 10毫米 scale_factor = self.record_params.get('actual_length', 20) / self.record_params.get('scale_length', 152) * 10 # 1. 按爪子类型分组并按时间排序 paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} for area in self.footprint_areas: if area.paw_type in paw_groups: paw_groups[area.paw_type].append({ 'start_frame': max(0, area.start_frame - self.pre_delay_frames), 'end_frame': area.end_frame + self.post_delay_frames, 'original_start': area.start_frame, 'original_end': area.end_frame, 'position': area.position }) # 2. 创建图表 - 增加高度以提供更多空间 plt.figure(figsize=(15, 10)) # 创建子图 for i, (paw_type, footprints) in enumerate(paw_groups.items()): ax = plt.subplot(4, 1, i+1) for footprint in footprints: # 初始化变量 areas = [0] times = [footprint['start_frame'] / self.fps] for frame_idx in range(footprint['start_frame'], footprint['end_frame']): cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) ret, frame = cap.read() if not ret: continue x = max(0, footprint['position']['x']) y = max(0, footprint['position']['y']) w = footprint['position']['width'] h = footprint['position']['height'] roi = frame[y:y+h, x:x+w] if roi.size > 0: # 只获取G通道的值 g_values = roi[:, :, 1] # 计算面积(高于阈值的像素数量) area_pixels = np.sum(g_values > pressure_threshold) # 转换为平方毫米 area_mm2 = area_pixels * (scale_factor ** 2) areas.append(float(area_mm2)) times.append(frame_idx / self.fps) # 添加结束点 areas.append(0) times.append(footprint['end_frame'] / self.fps) # 绘制面积曲线 if times: plt.plot(times, areas, '-', color=self.colors[paw_type], alpha=0.7, label='Area' if i == 0 else '') plt.fill_between(times, 0, areas, color=self.colors[paw_type], alpha=0.2) # 设置Y轴标签和标题 plt.ylabel(f'{paw_type} Area (mm²)') plt.title(f'{paw_type} Footprint Area Timeline') plt.grid(True, alpha=0.3) # 只在最后一个子图上显示X轴标签 if i < 3: # 前三个子图 plt.setp(ax.get_xticklabels(), visible=False) # 添加图例 plt.legend(bbox_to_anchor=(1.05, 4.5), loc='upper left') # 添加X轴标签(仅在最后一个子图上) plt.xlabel('Time (seconds)') # 调整子图间距,增加垂直间距 plt.subplots_adjust(hspace=0.4) # 保存图片 plt.savefig(f'{self.result_dir}/plots/area_timeline.png', dpi=300, bbox_inches='tight') plt.close() cap.release() print("足印面积时序图分析完成!") def analyze_swing_metrics(self): """分析摆动相关指标""" print("\n开始分析摆动相关指标...") # 按爪子类型分组 paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} for area in self.footprint_areas: if area.paw_type in paw_groups: paw_groups[area.paw_type].append(area) # 对每个爪子类型计算指标 swing_metrics = {paw_type: [] for paw_type in paw_groups} stride_durations = {paw_type: [] for paw_type in paw_groups} swing_percentages = {paw_type: [] for paw_type in paw_groups} for paw_type, footprints in paw_groups.items(): # 按时间排序 footprints.sort(key=lambda x: x.start_frame) for i in range(len(footprints) - 1): current_print = footprints[i] next_print = footprints[i + 1] # 计算摆动时长(从当前足印结束到下一个足印开始) swing_duration = (next_print.start_frame - current_print.end_frame) / self.fps swing_metrics[paw_type].append(swing_duration) # 计算完整步幅时长 stride_duration = (next_print.start_frame - current_print.start_frame) / self.fps stride_durations[paw_type].append(stride_duration) # 计算摆动百分比 swing_percentage = (swing_duration / stride_duration) * 100 swing_percentages[paw_type].append(swing_percentage) # 绘制摆动时长箱线图 self._plot_swing_stats(swing_metrics, "Swing Duration (s)", "swing_duration") # 绘制摆动百分比箱线图 self._plot_swing_stats(swing_percentages, "Swing Percentage (%)", "swing_percentage") # 保存统计数据 stats = { 'swing_duration': { paw_type: { 'mean': np.mean(durations), 'std': np.std(durations), 'min': np.min(durations), 'max': np.max(durations) } for paw_type, durations in swing_metrics.items() if durations }, 'swing_percentage': { paw_type: { 'mean': np.mean(percentages), 'std': np.std(percentages), 'min': np.min(percentages), 'max': np.max(percentages) } for paw_type, percentages in swing_percentages.items() if percentages }, 'stride_duration': { paw_type: { 'mean': np.mean(durations), 'std': np.std(durations), 'min': np.min(durations), 'max': np.max(durations) } for paw_type, durations in stride_durations.items() if durations } } # 保存到CSV self._save_swing_stats(stats) print("摆动相关指标分析完成!") return stats def _plot_swing_stats(self, data: Dict[str, List[float]], ylabel: str, filename: str): """绘制摆动相关指标的统计图""" plt.figure(figsize=(10, 6)) # 设置样式 plt.style.use('seaborn-v0_8-whitegrid') colors = {'RF': 'red', 'RH': 'blue', 'LF': 'green', 'LH': 'purple'} # 按照指定顺序处理数据 paw_order = ['RF', 'RH', 'LF', 'LH'] # 创建箱线图 box_plot = plt.boxplot([data[paw] for paw in paw_order], tick_labels=paw_order, patch_artist=True, medianprops={'color': 'black'}, flierprops={'marker': 'o', 'markerfacecolor': 'none', 'markersize': 4}) # 设置箱体颜色和异常值颜色 for patch, paw in zip(box_plot['boxes'], paw_order): patch.set_facecolor(colors[paw]) patch.set_alpha(0.6) # 设置对应的异常值颜色 for flier in box_plot['fliers']: flier.set_color(colors[paw]) # 添加数据点 for i, paw in enumerate(paw_order, 1): if paw in data: plt.scatter([i] * len(data[paw]), data[paw], color=colors[paw], alpha=0.4, s=30) plt.title(f'{ylabel} Distribution by Paw Type', pad=20) plt.xlabel('Paw Type', labelpad=10) plt.ylabel(ylabel, labelpad=10) plt.grid(True, axis='y', linestyle='--', alpha=0.3) plt.tight_layout() plt.savefig(f'{self.result_dir}/plots/{filename}.png', dpi=300, bbox_inches='tight') plt.close() def _save_swing_stats(self, stats: Dict): """保存摆动相关指标的统计数据到CSV""" for metric, data in stats.items(): df = pd.DataFrame.from_dict( {(i, j): data[i][j] for i in data.keys() for j in data[i].keys()}, orient='index' ) # 根据指标类型设置不同的精度 if metric in ['swing_duration', 'stride_duration']: df = df.round(3) # 时间相关,3位小数 else: df = df.round(1) # 其他指标,1位小数 df.to_csv(f'{self.result_dir}/data/{metric}_stats.csv') def analyze_stride_length(self): """分析步幅长度""" print("\n开始分析步幅长度...") # 按爪子类型分组 paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} for area in self.footprint_areas: if area.paw_type in paw_groups: paw_groups[area.paw_type].append(area) # 计算每个爪子类型的步幅长度 stride_lengths = {paw_type: [] for paw_type in paw_groups} for paw_type, footprints in paw_groups.items(): # 按时间排序 footprints.sort(key=lambda x: x.start_frame) for i in range(len(footprints) - 1): current_print = footprints[i] next_print = footprints[i + 1] # 计算两个足印中心点之间的距离 current_x = current_print.position['x'] + current_print.position['width'] / 2 next_x = next_print.position['x'] + next_print.position['width'] / 2 stride_length = abs(next_x - current_x) # 使用绝对值 stride_lengths[paw_type].append(stride_length) # 绘制步幅长度统计图 self._plot_stride_stats(stride_lengths) # 计算统计数据 stats = { paw_type: { 'mean': np.mean(lengths), 'std': np.std(lengths), 'min': np.min(lengths), 'max': np.max(lengths) } for paw_type, lengths in stride_lengths.items() if lengths } # 保存统计数据 df = pd.DataFrame.from_dict( {(i, j): stats[i][j] for i in stats.keys() for j in stats[i].keys()}, orient='index' ) df.to_csv(f'{self.result_dir}/data/stride_length_stats.csv') print("步幅长度分析完成!") return stats def _plot_stride_stats(self, stride_lengths: Dict[str, List[float]]): """绘制步幅长度统计图""" plt.figure(figsize=(10, 6)) plt.style.use('seaborn-v0_8-whitegrid') colors = {'RF': 'red', 'RH': 'blue', 'LF': 'green', 'LH': 'purple'} paw_order = ['RF', 'RH', 'LF', 'LH'] box_plot = plt.boxplot([stride_lengths[paw] for paw in paw_order], tick_labels=paw_order, patch_artist=True, medianprops={'color': 'black'}, flierprops={'marker': 'o', 'markerfacecolor': 'none', 'markersize': 4}) # 设置箱体颜色 for patch, paw in zip(box_plot['boxes'], paw_order): patch.set_facecolor(colors[paw]) patch.set_alpha(0.6) # 添加数据点 for i, paw in enumerate(paw_order, 1): plt.scatter([i] * len(stride_lengths[paw]), stride_lengths[paw], color=colors[paw], alpha=0.4, s=30) plt.title('Stride Length Distribution by Paw Type', pad=20) plt.xlabel('Paw Type', labelpad=10) plt.ylabel('Stride Length (pixels)', labelpad=10) plt.grid(True, axis='y', linestyle='--', alpha=0.3) plt.tight_layout() plt.savefig(f'{self.result_dir}/plots/stride_length.png', dpi=300, bbox_inches='tight') plt.close() def analyze_stance_width(self): """分析触地宽度(前后肢之间的左右距离)""" print("\n开始分析触地宽度...") # 按爪子类型分组 paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} for area in self.footprint_areas: if area.paw_type in paw_groups: paw_groups[area.paw_type].append(area) # 计算前肢和后肢的触地宽度 front_widths = [] # RF和LF之间的宽度 hind_widths = [] # RH和LH之间的宽度 # 按时间排序 for paw_type in paw_groups: paw_groups[paw_type].sort(key=lambda x: x.start_frame) # 计算每个时间点的触地宽度 for rf in paw_groups['RF']: rf_center_x = rf.position['x'] + rf.position['width'] / 2 rf_frame = rf.start_frame # 寻找最近时间的LF足印 closest_lf = min(paw_groups['LF'], key=lambda x: abs(x.start_frame - rf_frame), default=None) if closest_lf and abs(closest_lf.start_frame - rf_frame) < 10: # 限制时间差 lf_center_x = closest_lf.position['x'] + closest_lf.position['width'] / 2 front_width = abs(rf_center_x - lf_center_x) front_widths.append(front_width) # 同样计算后肢宽度 for rh in paw_groups['RH']: rh_center_x = rh.position['x'] + rh.position['width'] / 2 rh_frame = rh.start_frame closest_lh = min(paw_groups['LH'], key=lambda x: abs(x.start_frame - rh_frame), default=None) if closest_lh and abs(closest_lh.start_frame - rh_frame) < 10: lh_center_x = closest_lh.position['x'] + closest_lh.position['width'] / 2 hind_width = abs(rh_center_x - lh_center_x) hind_widths.append(hind_width) # 计算统计数据 stats = { 'front_width': { 'mean': np.mean(front_widths), 'std': np.std(front_widths), 'min': np.min(front_widths), 'max': np.max(front_widths) } if front_widths else {}, 'hind_width': { 'mean': np.mean(hind_widths), 'std': np.std(hind_widths), 'min': np.min(hind_widths), 'max': np.max(hind_widths) } if hind_widths else {} } # 绘制统计图 self._plot_stance_width_stats(front_widths, hind_widths) # 保存统计数据 df = pd.DataFrame.from_dict(stats, orient='index') df.to_csv(f'{self.result_dir}/data/stance_width_stats.csv') print("触地宽度分析完成!") return stats def _plot_stance_width_stats(self, front_widths: List[float], hind_widths: List[float]): """绘制触地宽度统计图""" plt.figure(figsize=(8, 6)) plt.style.use('seaborn-v0_8-whitegrid') # 使用前肢和后肢的对应颜色 colors = { 'Front': self.colors['RF'], # 使用前肢蓝色 'Hind': self.colors['RH'] # 使用后肢紫色 } data = [front_widths, hind_widths] labels = ['Front', 'Hind'] box_plot = plt.boxplot(data, tick_labels=labels, patch_artist=True, medianprops={'color': 'black'}, flierprops={'marker': 'o', 'markerfacecolor': 'none', 'markersize': 4}) # 设置箱体颜色 for patch, label in zip(box_plot['boxes'], labels): patch.set_facecolor(colors[label]) patch.set_alpha(0.6) # 添加数据点 for i, (widths, label) in enumerate(zip([front_widths, hind_widths], labels), 1): plt.scatter([i] * len(widths), widths, color=colors[label], alpha=0.4, s=30) plt.title('Stance Width Distribution', pad=20) plt.xlabel('Limb Type', labelpad=10) plt.ylabel('Width (pixels)', labelpad=10) plt.grid(True, axis='y', linestyle='--', alpha=0.3) plt.tight_layout() plt.savefig(f'{self.result_dir}/plots/stance_width.png', dpi=300, bbox_inches='tight') plt.close() def analyze_paw_area(self): """分析脚爪面积(基于G通道的压力区域)""" print("\n开始分析脚爪面积...") # 打开视频 cap = cv2.VideoCapture('exp_videos/Exp2.mp4') if not cap.isOpened(): raise ValueError("Cannot open video file") # 按爪子类型分组 paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} for area in self.footprint_areas: if area.paw_type in paw_groups: paw_groups[area.paw_type].append(area) # 计算每个足印的有效面积 paw_areas = {paw_type: [] for paw_type in paw_groups} # 设置压力阈值(G通道) pressure_threshold = 5 for paw_type, footprints in paw_groups.items(): for footprint in footprints: # 初始化变量 max_area = 0 max_intensity = 0 avg_intensity = 0 # 获取足印最大压力帧 for frame_idx in range(footprint.start_frame, footprint.end_frame): cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) ret, frame = cap.read() if not ret: continue # 提取足印区域 x = footprint.position['x'] y = footprint.position['y'] w = footprint.position['width'] h = footprint.position['height'] # 确保坐标在有效范围内 x = max(0, x) y = max(0, y) roi = frame[y:y+h, x:x+w] if roi.size > 0: # 提取G通道并计算有效面积 g_channel = roi[:, :, 1] current_area = np.sum(g_channel > pressure_threshold) max_area = max(max_area, current_area) max_intensity = max(max_intensity, np.max(g_channel)) avg_intensity = max(avg_intensity, np.mean(g_channel)) if max_area > 0: paw_areas[paw_type].append(max_area) # 计算统计数据 stats = { paw_type: { 'mean': np.mean(areas), 'std': np.std(areas), 'min': np.min(areas), 'max': np.max(areas) } for paw_type, areas in paw_areas.items() if areas } # 绘制统计图 self._plot_paw_area_stats(paw_areas) # 保存统计数据 df = pd.DataFrame.from_dict( {(i, j): stats[i][j] for i in stats.keys() for j in stats[i].keys()}, orient='index' ) df.to_csv(f'{self.result_dir}/data/paw_area_stats.csv') cap.release() print("脚爪面积分析完成!") return stats def _plot_paw_area_stats(self, paw_areas: Dict[str, List[float]]): """绘制脚爪面积统计图""" plt.figure(figsize=(10, 6)) plt.style.use('seaborn-v0_8-whitegrid') colors = {'RF': 'red', 'RH': 'blue', 'LF': 'green', 'LH': 'purple'} paw_order = ['RF', 'RH', 'LF', 'LH'] box_plot = plt.boxplot([paw_areas[paw] for paw in paw_order], tick_labels=paw_order, patch_artist=True, medianprops={'color': 'black'}, flierprops={'marker': 'o', 'markerfacecolor': 'none', 'markersize': 4}) # 设置箱体颜色 for patch, paw in zip(box_plot['boxes'], paw_order): patch.set_facecolor(colors[paw]) patch.set_alpha(0.6) # 添加数据点 for i, paw in enumerate(paw_order, 1): plt.scatter([i] * len(paw_areas[paw]), paw_areas[paw], color=colors[paw], alpha=0.4, s=30) plt.title('Paw Area Distribution by Paw Type', pad=20) plt.xlabel('Paw Type', labelpad=10) plt.ylabel('Area (pixels²)', labelpad=10) plt.grid(True, axis='y', linestyle='--', alpha=0.3) plt.tight_layout() plt.savefig(f'{self.result_dir}/plots/paw_area.png', dpi=300, bbox_inches='tight') plt.close() def generate_detailed_table(self): print("\n开始生成详细数据表...") if not self.video_path: raise ValueError("未指定视频文件路径") # 添加调试日志 print(f"\n检查关键点数据:") print(f"record_params keys: {self.record_params.keys()}") if 'bodyKeypoints' in self.record_params: print(f"bodyKeypoints 数量: {len(self.record_params['bodyKeypoints'])}") if self.record_params['bodyKeypoints']: print(f"第一个关键点数据示例: {self.record_params['bodyKeypoints'][0]}") else: print("未找到 bodyKeypoints 数据") # 创建帧到关键点的映射 keypoints_by_frame = {} for kp_data in self.record_params.get('bodyKeypoints', []): frame_id = kp_data['frame_id'] keypoints_by_frame[frame_id] = kp_data['keypoints'] print(f"处理后的关键点帧数: {len(keypoints_by_frame)}") # 打开视频获取足印图像 cap = cv2.VideoCapture(self.video_path) if not cap.isOpened(): raise ValueError(f"无法打开视频文件: {self.video_path}") detailed_data = [] movement_data = [] # 新增:存储运动方向数据 # 按时间顺序排序所有足印 sorted_footprints = sorted(self.footprint_areas, key=lambda x: x.start_frame) for area in sorted_footprints: # 1. 获取基本信息 footprint_data = { 'paw_type': area.paw_type, 'initial_contact': area.start_frame / self.fps, 'stand_duration': (area.end_frame - area.start_frame) / self.fps, 'max_contact': area.end_frame / self.fps } # 2. 提取中间帧的图像 middle_frame = area.start_frame + (area.end_frame - area.start_frame) // 2 cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame) ret, frame = cap.read() if ret: # 获取该帧的关键点 if middle_frame in keypoints_by_frame: kp = keypoints_by_frame[middle_frame] nose = kp['nose'] tail_base = kp['tail_base'] # 计算全身图像的边界(增加10%余量) x_min = min(nose[0], tail_base[0]) x_max = max(nose[0], tail_base[0]) margin = (x_max - x_min) * 0.1 x_min = max(0, int(x_min - margin)) x_max = min(frame.shape[1], int(x_max + margin)) # 计算关键点在截取图片中的相对坐标 nose_relative_x = int(nose[0] - x_min) nose_relative_y = int(nose[1]) # y坐标保持不变 tail_base_relative_x = int(tail_base[0] - x_min) tail_base_relative_y = int(tail_base[1]) # y坐标保持不变 # 提取全身图像 full_body_roi = frame[0:frame.shape[0], x_min:x_max] _, buffer_full = cv2.imencode('.png', full_body_roi) footprint_data['image_fullbody'] = base64.b64encode(buffer_full).decode('utf-8') # 添加关键点完整坐标 footprint_data['keypoints'] = { 'nose': {'x': nose_relative_x, 'y': nose_relative_y}, 'tail_base': {'x': tail_base_relative_x, 'y': tail_base_relative_y} } # 提取足印ROI(原有代码) x = max(0, area.position['x']) y = max(0, area.position['y']) w = area.position['width'] h = area.position['height'] roi = frame[y:y+h, x:x+w] # 只保留G通道 roi_g = roi.copy() roi_g[:, :, 0] = 0 roi_g[:, :, 2] = 0 _, buffer = cv2.imencode('.png', roi_g) footprint_data['image'] = base64.b64encode(buffer).decode('utf-8') footprint_data['frame_id'] = middle_frame # 3. 计算足印尺寸 footprint_data['print_length'] = area.position['height'] / 100 # 转换为厘米 footprint_data['print_width'] = area.position['width'] / 100 # 转换为厘米 # 4. 计算压力区域和强度 max_intensity = 0 avg_intensity = 0 area_size = 0 pressure_threshold = 30 for frame_idx in range(area.start_frame, area.end_frame): cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) ret, frame = cap.read() if ret: roi = frame[y:y+h, x:x+w] if roi.size > 0: g_channel = roi[:, :, 1] current_area = np.sum(g_channel > pressure_threshold) area_size = max(area_size, current_area) max_intensity = max(max_intensity, np.max(g_channel)) avg_intensity = max(avg_intensity, np.mean(g_channel)) footprint_data['print_area'] = area_size / 100 # 转换为平方厘米 footprint_data['max_intensity'] = int(max_intensity) footprint_data['average_intensity'] = round(avg_intensity, 2) detailed_data.append(footprint_data) cap.release() # 计算平滑的运动方向 movement_angles = self._calculate_smooth_movement_angles() for frame_id, angle in movement_angles.items(): movement_data.append({ 'frame_id': frame_id, 'movement_angle': angle }) # 创建最终的数据结构 output_data = { "base_footprint_data": detailed_data, "movement_direction_data": movement_data # 新增:作为并列的数据 } # 保存为JSON文件 output_path = f'{self.result_dir}/data/detailed_footprint_data.json' with open(output_path, 'w', encoding='utf-8') as f: json.dump(output_data, f, indent=2, ensure_ascii=False) print(f"详细数据表已生成:{output_path}") # 添加验证日志 print(f"\n数据统计:") print(f"- 足印数据条数: {len(detailed_data)}") print(f"- 运动方向数据条数: {len(movement_data)}") print(f"- 包含全身图像的足印数: {sum(1 for d in detailed_data if 'image_fullbody' in d)}") print(f"- 包含足印图像的足印数: {sum(1 for d in detailed_data if 'image' in d)}") # 检查第一条数据的结构(去除图像数据以便打印) if detailed_data: sample_data = detailed_data[0].copy() if 'image' in sample_data: sample_data['image'] = f"[base64 string of length {len(sample_data['image'])}]" if 'image_fullbody' in sample_data: sample_data['image_fullbody'] = f"[base64 string of length {len(sample_data['image_fullbody'])}]" print(f"\n示例数据结构:") print(json.dumps(sample_data, indent=2)) return output_data def _calculate_smooth_movement_angles(self): """计算平滑的运动方向角度""" # 收集所有mid点 mid_points = [] frame_ids = [] for kp_data in self.record_params.get('bodyKeypoints', []): frame_ids.append(kp_data['frame_id']) mid_points.append(kp_data['keypoints']['mid']) if not mid_points: return {} # 转换为numpy数组 points = np.array(mid_points) # 使用Savitzky-Golay滤波器平滑轨迹 window_length = min(len(points), 15) # 窗口长度必须是奇数 if window_length % 2 == 0: window_length -= 1 if window_length >= 3: smooth_x = savgol_filter(points[:, 0], window_length, 3) smooth_y = savgol_filter(points[:, 1], window_length, 3) else: smooth_x = points[:, 0] smooth_y = points[:, 1] # 计算每个点的切线角度 angles = {} for i in range(len(smooth_x)-1): dx = smooth_x[i+1] - smooth_x[i] dy = smooth_y[i+1] - smooth_y[i] angle = np.degrees(np.arctan2(dy, dx)) angles[frame_ids[i]] = angle # 处理最后一个点 if frame_ids: angles[frame_ids[-1]] = angles[frame_ids[-2]] if frame_ids[-2] in angles else 0 return angles def generate_collection_table(self): """生成采集数据表格(Overview) 从record_params获取:实验名称、开始时间、方向等信息 计算得出:运行时长、平均速度、步数、步频 """ # 获取输入参数(使用默认值避免报错) direction_map = { "left_to_right": "L to R", "right_to_left": "R to L" } input_params = { "name": self.record_params.get("name", "未命名记录"), "start_time": self.record_params.get("start_time", datetime.now().strftime("%Y-%m-%d %H:%M:%S")), "direction": direction_map.get(self.record_params.get("direction", "left_to_right"), "L to R"), "walkway_length": f"{self.record_params.get('actual_length', 20)} cm", # 使用actual_length作为步道长度 "walkway_width": f"{self.record_params.get('crop_height', 152)/8:.1f} cm", # 使用crop_height估算步道宽度 "weight": f"{self.record_params.get('weight', 0)} g" # 添加体重信息 } # 计算运行时长(从第一个足印到最后一个足印的时间) sorted_footprints = sorted(self.footprint_areas, key=lambda x: x.start_frame) if sorted_footprints: first_frame = sorted_footprints[0].start_frame last_frame = sorted_footprints[-1].end_frame duration_seconds = (last_frame - first_frame) / self.fps # 计算步数 steps_count = len(sorted_footprints) # 计算步频(步数/时间) cadence = steps_count / duration_seconds if duration_seconds > 0 else 0 # 计算平均速度(步道长度/时间) try: walkway_length_cm = float(input_params["walkway_length"].split()[0]) # 提取数字部分 average_speed = walkway_length_cm / duration_seconds except (ValueError, IndexError): average_speed = 0 else: duration_seconds = 0 steps_count = 0 cadence = 0 average_speed = 0 # 组合所有数据 collection_data = { # 从输入参数获取的数据 "name": input_params["name"], "start_time": input_params["start_time"], "direction": input_params["direction"], "walkway_length": input_params["walkway_length"], "walkway_width": input_params["walkway_width"], "weight": input_params["weight"], # 计算得出的数据 "run_duration": f"{duration_seconds:.1f} s", "average_speed": f"{average_speed:.3f} cm/s", "steps_count": steps_count, "cadence": f"{cadence:.1f} steps/sec" } return collection_data def generate_paw_statistics(self) -> dict: """生成足印统计数据表格,按照不同类别组织数据 Returns: dict: 包含每个足印的统计数据,按类别分组 """ print("\n开始生成足印统计数据...") # 获取比例尺 (pixels to cm) scale_factor = self.record_params.get('actual_length', 20) / self.record_params.get('scale_length', 152) # 按爪子类型分组并按时间排序 paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} for area in self.footprint_areas: if area.paw_type in paw_groups: paw_groups[area.paw_type].append(area) for paw_type in paw_groups: paw_groups[paw_type].sort(key=lambda x: x.start_frame) # 打开视频获取压力数据 cap = cv2.VideoCapture(self.video_path) if not cap.isOpened(): raise ValueError("无法打开视频文件") paw_stats = {} for paw_type, footprints in paw_groups.items(): paw_stats[paw_type] = [] for i, footprint in enumerate(footprints): # 基础数据计算 x = footprint.position['x'] y = footprint.position['y'] width = footprint.position['width'] height = footprint.position['height'] # 压力数据计算 max_intensity = 0 mean_intensity = 0 min_intensity = 255 max_area = 0 pressure_threshold = 5 # 遍历足印的所有帧获取压力数据 for frame_idx in range(footprint.start_frame, footprint.end_frame + 1): cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) ret, frame = cap.read() if ret: roi = frame[y:y+height, x:x+width] if roi.size > 0: g_channel = roi[:, :, 1] current_max = np.max(g_channel) current_mean = np.mean(g_channel) # 修改这里:添加安全检查 pressure_pixels = g_channel[g_channel > pressure_threshold] if pressure_pixels.size > 0: current_min = np.min(pressure_pixels) min_intensity = min(min_intensity, current_min) current_area = np.sum(g_channel > pressure_threshold) max_intensity = max(max_intensity, current_max) mean_intensity = max(mean_intensity, current_mean) max_area = max(max_area, current_area) # 计算支撑和摆动相关数据 initial_contact = footprint.start_frame / self.fps max_contact = footprint.end_frame / self.fps stand_duration = (footprint.end_frame - footprint.start_frame) / self.fps swing_time = 0 swing_speed = 0 stride_length = 0 if i < len(footprints) - 1: next_print = footprints[i + 1] swing_time = (next_print.start_frame - footprint.end_frame) / self.fps stride_length = abs( (next_print.position['x'] + next_print.position['width']/2) - (x + width/2) ) * scale_factor if swing_time > 0: swing_speed = stride_length / swing_time # 组织数据结构 footprint_data = { # Footprint Basic Data "basic_data": { "x_position": round(x * scale_factor, 1), "y_position": round(y * scale_factor, 1), "length": round(height * scale_factor, 1), "width": round(width * scale_factor, 1), "max_area": round(max_area * (scale_factor ** 2), 1) }, # Time Points "time_points": { "initial_contact": round(initial_contact, 3), "stand": round(stand_duration, 3), "max_contact": round(max_contact, 3) }, # Intensity "intensity": { "max_intensity": int(max_intensity), "min_intensity": int(min_intensity) if min_intensity < 255 else 0, "mean_intensity": round(mean_intensity, 1), "max_contact_ratio": round((max_intensity / 255) * 100, 1), "max_intensity_ratio": round((mean_intensity / max_intensity) * 100, 1) if max_intensity > 0 else 0 }, # Support "support": { "support_duration": round(stand_duration, 3), "support_ratio": round((stand_duration / (stand_duration + swing_time)) * 100, 1) if (stand_duration + swing_time) > 0 else 0, "initial_bipedal_support": round(initial_contact, 3), "final_bipedal_support": round(max_contact, 3) }, # Toe "toe": { "toe_spread": round(width * scale_factor, 1), "intermediate_toe_spread": round(width * scale_factor * 0.5, 1), "angle_between_motion": 10, # 简化处理,使用固定值 "angle_between_body_axis": 10 # 简化处理,使用固定值 }, # Other "other": { "swing": round(swing_time, 3), "swing_speed": round(swing_speed, 1), "stride_length": round(stride_length, 1), "step_cycle": round(stand_duration + swing_time, 3), "body_speed": round(stride_length / (stand_duration + swing_time), 1) if (stand_duration + swing_time) > 0 else 0 } } paw_stats[paw_type].append(footprint_data) cap.release() print("足印统计数据生成完成!") return paw_stats def generate_step_sequence_table(self) -> dict: """生成步序数据统计表格 Returns: dict: 包含步序统计数据的字典 """ print("\n开始生成步序统计数据...") # 定义步态模式 GAIT_PATTERNS = { 'LH-RF-RH-LF': 'Aa', 'LH-LF-RH-RF': 'Ab', 'LH-RF-LF-RH': 'Ca', 'LH-RH-LF-RF': 'Cb', 'LH-RH-RF-LF': 'Ra', 'LH-LF-RF-RH': 'Rb' } # 收集所有足印的开始时间并按时间排序 all_footprints = [] for area in self.footprint_areas: all_footprints.append({ 'paw_type': area.paw_type, 'start_time': area.start_frame / self.fps }) # 按时间排序 all_footprints.sort(key=lambda x: x['start_time']) # 识别步序模式 sequences = [] current_seq = [] for fp in all_footprints: if not current_seq and fp['paw_type'] == 'LH': current_seq = [fp] elif current_seq: current_seq.append(fp) if len(current_seq) == 4: # 获取步序模式 pattern = '-'.join([p['paw_type'] for p in current_seq]) if pattern in GAIT_PATTERNS: sequences.append(GAIT_PATTERNS[pattern]) current_seq = [] # 统计各种模式的出现次数 pattern_counts = { 'Aa': 0, 'Ab': 0, 'Ca': 0, 'Cb': 0, 'Ra': 0, 'Rb': 0 } for seq in sequences: if seq in pattern_counts: pattern_counts[seq] += 1 total_sequences = len(sequences) # 生成统计数据 sequence_stats = { "summary": { "total_sequences": total_sequences, "normal_sequences": sum(pattern_counts.values()), # 正常步序总数 "abnormal_sequences": total_sequences - sum(pattern_counts.values()) # 异常步序数 }, "patterns": [ { "pattern_code": pattern, "count": count, "percentage": round((count / total_sequences * 100), 1) if total_sequences > 0 else 0 } for pattern, count in pattern_counts.items() ], "raw_sequence": sequences # 原始步序列表 } print("步序统计数据生成完成!") return sequence_stats def generate_foot_spacing_table(self) -> dict: """生成足间距数据统计表格 Returns: dict: 包含足间距统计数据的字典 """ print("\n开始生成足间距统计数据...") # 获取比例尺 (pixels to cm) scale_factor = self.record_params.get('actual_length', 20) / self.record_params.get('scale_length', 152) # 按爪子类型分组并按时间排序 paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} for area in self.footprint_areas: if area.paw_type in paw_groups: paw_groups[area.paw_type].append(area) # 计算每个爪子的中心点坐标 def get_center_point(footprint): x = footprint.position['x'] + footprint.position['width'] / 2 y = footprint.position['y'] + footprint.position['height'] / 2 return (x, y) # 计算两点之间的距离 def calculate_distance(p1, p2): return np.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2) * scale_factor # 初始化距离统计 spacing_stats = { "front_paws": { "count": 0, "mean_spacing": 0, "distances": [] }, "hind_paws": { "count": 0, "mean_spacing": 0, "distances": [] }, "paw_pairs": { "RF": { "count": 0, "mean_spacing": 0, "standard_deviation": 0, "distances": [] }, "LF": { "count": 0, "mean_spacing": 0, "standard_deviation": 0, "distances": [] }, "LH": { "count": 0, "mean_spacing": 0, "standard_deviation": 0, "distances": [] }, "RH": { "count": 0, "mean_spacing": 0, "standard_deviation": 0, "distances": [] } } } # 计算前爪之间的距离 if len(paw_groups['RF']) > 0 and len(paw_groups['LF']) > 0: for rf in paw_groups['RF']: rf_center = get_center_point(rf) for lf in paw_groups['LF']: lf_center = get_center_point(lf) distance = calculate_distance(rf_center, lf_center) spacing_stats["front_paws"]["distances"].append(distance) # 计算后爪之间的距离 if len(paw_groups['RH']) > 0 and len(paw_groups['LH']) > 0: for rh in paw_groups['RH']: rh_center = get_center_point(rh) for lh in paw_groups['LH']: lh_center = get_center_point(lh) distance = calculate_distance(rh_center, lh_center) spacing_stats["hind_paws"]["distances"].append(distance) # 计算各爪子对之间的距离 for paw_type in ['RF', 'LF', 'LH', 'RH']: if len(paw_groups[paw_type]) > 1: for i in range(len(paw_groups[paw_type]) - 1): current = paw_groups[paw_type][i] next_paw = paw_groups[paw_type][i + 1] distance = calculate_distance( get_center_point(current), get_center_point(next_paw) ) spacing_stats["paw_pairs"][paw_type]["distances"].append(distance) # 计算统计数据 # 前爪统计 if spacing_stats["front_paws"]["distances"]: distances = spacing_stats["front_paws"]["distances"] spacing_stats["front_paws"].update({ "count": len(distances), "mean_spacing": round(np.mean(distances), 1) }) # 后爪统计 if spacing_stats["hind_paws"]["distances"]: distances = spacing_stats["hind_paws"]["distances"] spacing_stats["hind_paws"].update({ "count": len(distances), "mean_spacing": round(np.mean(distances), 1) }) # 各爪子对统计 for paw_type in ['RF', 'LF', 'LH', 'RH']: if spacing_stats["paw_pairs"][paw_type]["distances"]: distances = spacing_stats["paw_pairs"][paw_type]["distances"] mean_spacing = np.mean(distances) std_dev = np.std(distances) spacing_stats["paw_pairs"][paw_type].update({ "count": len(distances), "mean_spacing": round(mean_spacing, 1), "standard_deviation": round((std_dev / mean_spacing) * 100, 1) # 转换为百分比 }) print("足间距统计数据生成完成!") return spacing_stats def generate_support_table(self) -> dict: """生成支撑统计数据(带中英文字段)""" # 中英对照表 support_type_mapping = { "diagonal": "对角支撑", # 对侧交叉支撑(如II-DH) "four": "四肢支撑", # 四肢同时支撑 "girdle": "同源支撑", # 同源双肢支撑(如II-GR) "lateral": "同侧支撑", # 同侧双肢支撑(如II-LR) "single": "单肢支撑", # 单肢独立支撑 "standing": "站立支撑", # 静止站立状态 "three": "三肢支撑", # 三肢同时支撑 "zero": "无支撑" # 无任何肢体支撑 } support_stats = { "support_sequence": [], "support_types": { "diagonal": 0.0, "four": 0.0, "girdle": 0.0, "lateral": 0.0, "single": 0.0, "standing": 0.0, "three": 0.0, "zero": 0.0 } } # 修复时间属性访问方式 all_footprints = sorted( [{ 'start_time': area.start_frame / self.fps, # 正确计算开始时间 'end_time': area.end_frame / self.fps, # 新增结束时间 'duration': (area.end_frame - area.start_frame) / self.fps, 'paw_type': area.paw_type } for area in self.footprint_areas], key=lambda x: x['start_time'] ) # 分析每个时间点的支撑情况 for fp in all_footprints: # 获取当前所有活动爪子 active_paws = [ p['paw_type'] for p in all_footprints if p['start_time'] <= fp['start_time'] < (p['start_time'] + p['duration']) ] support_count = len(active_paws) # 生成SupportFormula (1-4) support_formula = str(min(support_count, 4)) # 最大显示为4 # 生成FootfallFormula if support_count == 1: # 单肢模式 I-{爪子类型} footfall_code = f"I-{active_paws[0]}" elif support_count == 2: # 双肢模式 II-{类型}{组合} paw1, paw2 = sorted(active_paws) # 判断支撑类型 if (paw1[0] != paw2[0]) and (paw1[1] != paw2[1]): # 对侧交叉 type_code = "D" pos_code = paw1[1]+paw2[1] # 取前后位置 F/H elif paw1[0] == paw2[0]: # 同源支撑 type_code = "G" pos_code = paw1[0] # 取左右侧 R/L else: # 同侧支撑 type_code = "L" pos_code = paw1[0] # 取左右侧 R/L footfall_code = f"II-{type_code}{pos_code}" elif support_count == 3: # 三肢模式 III-{主导爪} lead_paw = min(active_paws) # 按字母顺序取第一个 footfall_code = f"III-{lead_paw}" else: footfall_code = "IV" # 记录支撑序列 support_stats["support_sequence"].append({ "start_time": round(fp['start_time'], 4), "duration": round(fp['duration'], 4), "support_formula": support_formula, "footfall_formula": footfall_code }) # 初始化支撑类型 support_type = "unknown" # 根据footfall_code判断支撑类型 if support_count == 0: support_type = "zero" elif support_count == 1: support_type = "single" elif support_count == 2: if footfall_code.startswith("II-D"): support_type = "diagonal" elif footfall_code.startswith("II-G"): support_type = "girdle" elif footfall_code.startswith("II-L"): support_type = "lateral" else: support_type = "unknown_dual" # 未知双肢类型 elif support_count == 3: support_type = "three" elif support_count == 4: support_type = "four" # 覆盖判断站立状态 if self._is_standing_state(fp['start_time']): support_type = "standing" # 统计持续时间 if support_type in support_stats["support_types"]: support_stats["support_types"][support_type] += fp['duration'] else: logging.warning(f"未知支撑类型: {support_type} 时间: {fp['start_time']}") continue # 计算百分比(如果需要) total_duration = sum(support_stats["support_types"].values()) if total_duration > 0: for k in support_stats["support_types"]: support_stats["support_types"][k] = round( (support_stats["support_types"][k] / total_duration) * 100, 4 ) print("足支撑统计数据生成完成!") return support_stats def generate_coordination_table(self) -> dict: """生成双足协调性数据统计表格 Returns: dict: 包含双足协调性统计数据的字典 """ print("\n开始生成双足协调性统计数据...") # 初始化协调性数据结构 coordination_stats = { "summary": { "standard_mean": 0, "standard_deviation": 0, "mistakes": 0 }, "pairs": { "LF-RH": [], # 左前-右后 "LF-LH": [], # 左前-左后 "LF-RF": [], # 左前-右前 "RH-LF": [], # 右后-左前 "RH-LH": [], # 右后-左后 "RH-RF": [] # 右后-右前 }, "coordination_sequence": [] # 详细的协调序列 } # 按爪子类型分组并按时间排序 paw_groups = {'RF': [], 'RH': [], 'LF': [], 'LH': []} for area in self.footprint_areas: if area.paw_type in paw_groups: paw_groups[area.paw_type].append({ 'paw_type': area.paw_type, 'start_time': area.start_frame / self.fps, 'duration': (area.end_frame - area.start_frame) / self.fps, 'end_time': area.end_frame / self.fps }) # 对每个组内的足印按时间排序 for paw_type in paw_groups: paw_groups[paw_type].sort(key=lambda x: x['start_time']) # 分析爪子对之间的协调性 def analyze_pair_coordination(anchor_prints, target_prints, pair_type): pair_stats = [] for anchor in anchor_prints: # 增加有效性检查 if anchor['duration'] <= 0 or anchor['start_time'] < 0: continue # 寻找时间窗口内的目标足印 valid_targets = [ t for t in target_prints if t['start_time'] >= anchor['start_time'] - 0.5 # 扩大时间窗口 and t['start_time'] <= anchor['end_time'] + 0.5 ] for target in valid_targets: # 修正计算方式(取消绝对值) time_diff = target['start_time'] - anchor['start_time'] value = (time_diff / anchor['duration']) * 100 # 修正异常类型判断 if time_diff < 0: # 目标足提前启动 peculiarity = "ANT" value = abs(value) elif 0 <= value <= 100: peculiarity = "NOR" else: # 超过100%为滞后 peculiarity = "DEL" # 修改异常代码 value = value % 100 # 添加有效性阈值 if abs(time_diff) > 2.0: # 超过2秒视为无效数据 continue coordination_entry = { "value": round(value, 4), "anchor_start": round(anchor['start_time'], 4), # 保持原有名称 "anchor_duration": round(anchor['duration'], 4), # 保持原有名称 "target_start": round(target['start_time'], 4), # 保持原有名称 "peculiarity": peculiarity } pair_stats.append(coordination_entry) break return pair_stats # 分析所有配对 pair_configs = [ ('LF', 'RH', 'LF-RH'), ('LF', 'LH', 'LF-LH'), ('LF', 'RF', 'LF-RF'), ('RH', 'LF', 'RH-LF'), ('RH', 'LH', 'RH-LH'), ('RH', 'RF', 'RH-RF') ] all_values = [] mistakes_count = 0 for anchor_type, target_type, pair_name in pair_configs: pair_stats = analyze_pair_coordination( paw_groups[anchor_type], paw_groups[target_type], pair_name ) coordination_stats["pairs"][pair_name] = pair_stats # 收集所有有效值用于计算总体统计 values = [stat["value"] for stat in pair_stats if stat["value"] > 0] all_values.extend(values) # 统计异常情况 mistakes_count += sum(1 for stat in pair_stats if stat["peculiarity"] != "NOR") # 计算总体统计数据 if all_values: coordination_stats["summary"].update({ "standard_mean": round(np.mean(all_values), 0), "standard_deviation": round(np.std(all_values), 0), "mistakes": mistakes_count }) print("双足协调性统计数据生成完成!") return coordination_stats def _is_standing_state(self, current_time: float) -> bool: """修复后的站立状态判断""" # 转换帧号为时间 start_time = current_time - 0.5 hind_prints = [ p for p in self.footprint_areas if p.paw_type in ('LH', 'RH') and (p.start_frame / self.fps) <= current_time <= (p.end_frame / self.fps) and (p.start_frame / self.fps) >= start_time ] # 持续时间计算修正 return ( len(hind_prints) == 2 and all((p.end_frame - p.start_frame)/self.fps >= 0.5 for p in hind_prints) ) def generate_3d_footprint_analysis(self): """生成3D足印分析数据""" print("\n开始生成3D足印可视化...") # 创建保存目录 footprint_3d_dir = f'{self.result_dir}/plots/footprints_3d' os.makedirs(footprint_3d_dir, exist_ok=True) html_dir = f'{self.result_dir}/plots/interactive_3d' os.makedirs(html_dir, exist_ok=True) # 生成静态3D图像 self._generate_3d_footprint_plots() # 生成交互式3D足印可视化 interactive_3d_data = self._generate_interactive_3d_footprints() # 创建索引页面 self._create_3d_footprint_index(html_dir) return interactive_3d_data def _generate_3d_footprint_plots(self): """为每个足印区域生成3D图像""" import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes3D import numpy as np import os # 创建保存目录 footprint_3d_dir = f'{self.result_dir}/plots/footprints_3d' os.makedirs(footprint_3d_dir, exist_ok=True) # 按cluster(area_id)分组足印 cluster_groups = {} for footprint in self.footprint_areas: if footprint.area_id not in cluster_groups: cluster_groups[footprint.area_id] = [] cluster_groups[footprint.area_id].append(footprint) # 处理每个cluster for cluster_id, footprints in cluster_groups.items(): # 按时间排序并选择中位数时间点的足印作为关键足印 sorted_footprints = sorted(footprints, key=lambda x: x.start_frame) key_footprint = sorted_footprints[len(sorted_footprints)//2] # 打开视频并提取足印图像 if self.video_path: cap = cv2.VideoCapture(self.video_path) if not cap.isOpened(): print(f"无法打开视频: {self.video_path}") continue # 获取足印中间帧 middle_frame = (key_footprint.start_frame + key_footprint.end_frame) // 2 cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame) ret, frame = cap.read() if not ret: print(f"无法读取帧: {middle_frame}") cap.release() continue # 提取足印ROI x = max(0, key_footprint.position['x']) y = max(0, key_footprint.position['y']) w = key_footprint.position['width'] h = key_footprint.position['height'] if x+w > frame.shape[1] or y+h > frame.shape[0]: print(f"ROI超出图像范围: {x},{y},{w},{h}") cap.release() continue roi = frame[y:y+h, x:x+w] # 提取绿色通道作为强度图 green_channel = roi[:,:,1] # 确定方形区域大小(使用最长边) max_side = max(green_channel.shape[0], green_channel.shape[1]) square_patch = np.zeros((max_side, max_side), dtype=np.uint8) # 将原图放在方形中心 start_y = (max_side - green_channel.shape[0]) // 2 start_x = (max_side - green_channel.shape[1]) // 2 square_patch[start_y:start_y+green_channel.shape[0], start_x:start_x+green_channel.shape[1]] = green_channel # 创建网格 x = np.linspace(0, square_patch.shape[1]-1, square_patch.shape[1]) y = np.linspace(0, square_patch.shape[0]-1, square_patch.shape[0]) X, Y = np.meshgrid(x, y) # 创建3D图 fig = plt.figure(figsize=(10, 8)) ax = fig.add_subplot(111, projection='3d') # 绘制3D表面 surf = ax.plot_surface(X, Y, square_patch, cmap='viridis') # 设置标题和标签 paw_type = key_footprint.paw_type ax.set_title(f'Footprint 3D View - {paw_type} #{cluster_id}') ax.set_xlabel('X') ax.set_ylabel('Y') ax.set_zlabel('Intensity') # 添加颜色条 fig.colorbar(surf) # 保存图像 plt.savefig(f'{footprint_3d_dir}/footprint_{cluster_id}_{paw_type}.png', dpi=300, bbox_inches='tight') plt.close() cap.release() def _generate_interactive_3d_footprints(self): """生成交互式3D足印热图可视化并返回base64编码的HTML内容""" try: import plotly.graph_objects as go from plotly.subplots import make_subplots import numpy as np import os import cv2 from scipy.interpolate import griddata except ImportError: print("请安装必要的库: pip install plotly scipy") return {} # 创建保存目录 html_dir = f'{self.result_dir}/plots/interactive_3d' os.makedirs(html_dir, exist_ok=True) # 按cluster(area_id)分组足印 cluster_groups = {} for footprint in self.footprint_areas: if footprint.area_id not in cluster_groups: cluster_groups[footprint.area_id] = [] cluster_groups[footprint.area_id].append(footprint) # 存储HTML内容的字典 html_data = {} # 处理每个cluster生成单独的HTML for cluster_id, footprints in cluster_groups.items(): # 按时间排序并选择中位数时间点的足印作为关键足印 sorted_footprints = sorted(footprints, key=lambda x: x.start_frame) key_footprint = sorted_footprints[len(sorted_footprints)//2] paw_type = key_footprint.paw_type html_path = f'{html_dir}/footprint_{cluster_id}_{paw_type}.html' # 打开视频并提取足印图像 if self.video_path: cap = cv2.VideoCapture(self.video_path) if not cap.isOpened(): print(f"无法打开视频: {self.video_path}") continue # 获取足印中间帧 middle_frame = (key_footprint.start_frame + key_footprint.end_frame) // 2 cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame) ret, frame = cap.read() if not ret: print(f"无法读取帧: {middle_frame}") cap.release() continue # 提取足印ROI x = max(0, key_footprint.position['x']) y = max(0, key_footprint.position['y']) w = key_footprint.position['width'] h = key_footprint.position['height'] if x+w > frame.shape[1] or y+h > frame.shape[0]: print(f"ROI超出图像范围: {x},{y},{w},{h}") cap.release() continue roi = frame[y:y+h, x:x+w] # 检查图像格式并处理 if len(roi.shape) == 2 or (len(roi.shape) == 3 and roi.shape[2] == 1): # 单通道图像(灰度图)- 直接使用 if len(roi.shape) == 3: gray = roi[:,:,0] else: gray = roi else: # 多通道图像(BGR或RGB)- 先提取绿色通道 hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) # 定义绿色的HSV范围 lower_green = np.array([40, 40, 40]) upper_green = np.array([80, 255, 255]) # 创建掩码 mask = cv2.inRange(hsv, lower_green, upper_green) # 应用掩码 green_only = cv2.bitwise_and(roi, roi, mask=mask) # 转换为灰度图 gray = cv2.cvtColor(green_only, cv2.COLOR_BGR2GRAY) # 应用高斯模糊平滑过渡 smooth_gray = ndimage.gaussian_filter(gray, sigma=1.5) # 过滤低值(底色) threshold = np.max(smooth_gray) * 0.1 if np.max(smooth_gray) > 0 else 0 filtered_gray = np.where(smooth_gray < threshold, 0, smooth_gray) # 确定方形区域大小(使用最长边) max_side = max(filtered_gray.shape[0], filtered_gray.shape[1]) square_patch = np.zeros((max_side, max_side), dtype=np.uint8) # 将原图放在方形中心 start_y = (max_side - filtered_gray.shape[0]) // 2 start_x = (max_side - filtered_gray.shape[1]) // 2 square_patch[start_y:start_y+filtered_gray.shape[0], start_x:start_x+filtered_gray.shape[1]] = filtered_gray # 增加分辨率(插值平滑) factor = 2 # 分辨率提高因子 new_size = max_side * factor # 原始坐标 y_old, x_old = np.mgrid[0:max_side, 0:max_side] # 新坐标 y_new, x_new = np.mgrid[0:max_side:complex(0, new_size), 0:max_side:complex(0, new_size)] # 执行插值 z_upscaled = griddata((y_old.flatten(), x_old.flatten()), square_patch.flatten(), (y_new, x_new), method='cubic', fill_value=0) # 再次应用平滑 z_upscaled = ndimage.gaussian_filter(z_upscaled, sigma=1) # 创建交互式3D图 fig = go.Figure(data=[go.Surface( z=z_upscaled, colorscale='Jet', # 热图色彩 colorbar=dict(title="强度"), contours = { "z": {"show": True, "start": 0, "end": 255, "size": 10} } )]) # 设置图表布局和标题 fig.update_layout( title=f'足印3D交互式热图 - {paw_type} #{cluster_id}', width=800, height=800, scene=dict( xaxis_title='X', yaxis_title='Y', zaxis_title='强度', aspectratio=dict(x=1, y=1, z=0.5), camera=dict( eye=dict(x=1.5, y=1.5, z=0.8) ) ), margin=dict(l=0, r=0, b=0, t=30) ) # 保存为HTML文件 fig.write_html( html_path, include_plotlyjs='cdn', full_html=True, config={ 'displayModeBar': True, 'editable': True, 'toImageButtonOptions': { 'format': 'png', 'filename': f'footprint_{cluster_id}_{paw_type}', 'height': 800, 'width': 800, 'scale': 2 } } ) # 读取HTML内容并转换为base64 with open(html_path, 'rb') as f: html_content = f.read() html_base64 = base64.b64encode(html_content).decode('utf-8') # 保存到字典 html_data[f"{cluster_id}_{paw_type}"] = { 'html_base64': html_base64, 'filename': f'footprint_{cluster_id}_{paw_type}.html', 'paw_type': paw_type, 'cluster_id': cluster_id } cap.release() print(f"交互式3D足印热图已保存至: {html_path}") # 创建索引页面 self._create_3d_footprint_index(html_dir) return html_data def _create_3d_footprint_index(self, html_dir): """创建3D足印可视化的HTML索引页面""" html_files = [f for f in os.listdir(html_dir) if f.endswith('.html') and f != 'index.html'] if not html_files: return index_html = f'{html_dir}/index.html' # 按足爪类型分组文件 paw_types = { 'leftFront': [], 'rightFront': [], 'leftHind': [], 'rightHind': [], 'unknown': [] } type_map = { 'LF': 'leftFront', 'RF': 'rightFront', 'LH': 'leftHind', 'RH': 'rightHind' } for filename in html_files: for paw_code, paw_name in type_map.items(): if paw_code in filename: paw_types[paw_name].append(filename) break else: paw_types['unknown'].append(filename) # 生成HTML内容 with open(index_html, 'w', encoding='utf-8') as f: f.write("""