|
|
from fastapi import FastAPI, UploadFile, Form, File, HTTPException |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
import json |
|
|
import tempfile |
|
|
import os |
|
|
from pathlib import Path |
|
|
import uvicorn |
|
|
from typing import Optional |
|
|
from pydantic import BaseModel |
|
|
import shutil |
|
|
import logging |
|
|
from datetime import datetime |
|
|
import base64 |
|
|
|
|
|
from gait_analyze import GaitAnalyzer |
|
|
from gait_analysis_report import GaitAnalysisReport |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
|
handlers=[ |
|
|
logging.FileHandler('gait_analysis.log'), |
|
|
logging.StreamHandler() |
|
|
] |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = FastAPI( |
|
|
title="小鼠步态分析API", |
|
|
description="用于分析小鼠步态视频的REST API服务", |
|
|
version="1.0.0" |
|
|
) |
|
|
|
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
class AnalysisResponse(BaseModel): |
|
|
"""分析响应模型""" |
|
|
status: str |
|
|
message: str |
|
|
data: Optional[dict] = None |
|
|
|
|
|
class GaitAnalysisParams(BaseModel): |
|
|
"""步态分析参数模型""" |
|
|
video_path: str |
|
|
json_path: str |
|
|
pre_delay_ms: int = 30 |
|
|
post_delay_ms: int = 30 |
|
|
|
|
|
@app.post("/api/v1/analysisFootVideo") |
|
|
async def analysis_foot_video( |
|
|
video: UploadFile = File(...), |
|
|
params: str = Form(...) |
|
|
): |
|
|
"""处理足印视频分析请求""" |
|
|
task_id = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
logger.info(f"开始新的分析任务 {task_id}") |
|
|
|
|
|
try: |
|
|
|
|
|
analysis_params = json.loads(params) |
|
|
logger.info(f"任务 {task_id} 参数: {analysis_params}") |
|
|
|
|
|
|
|
|
temp_dir = Path("temp") / task_id |
|
|
temp_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
try: |
|
|
|
|
|
video_path = temp_dir / "input_video.mp4" |
|
|
with open(video_path, "wb") as buffer: |
|
|
shutil.copyfileobj(video.file, buffer) |
|
|
logger.info(f"任务 {task_id} 视频保存成功") |
|
|
|
|
|
|
|
|
analyzer = GaitAnalyzer() |
|
|
|
|
|
|
|
|
logger.info(f"任务 {task_id} 开始检测时间范围") |
|
|
start_time, end_time = analyzer._detect_mouse_time_range(video_path) |
|
|
logger.info(f"任务 {task_id} 检测到时间范围: {start_time:.2f}s - {end_time:.2f}s") |
|
|
|
|
|
|
|
|
logger.info(f"任务 {task_id} 开始处理视频") |
|
|
analyzer.process_video( |
|
|
video_path, |
|
|
start_time=start_time, |
|
|
end_time=end_time, |
|
|
conf_thres=0.3, |
|
|
iou_thres=0.5 |
|
|
) |
|
|
|
|
|
|
|
|
result_dir = Path(analyzer.result_dir) |
|
|
|
|
|
|
|
|
json_path = result_dir / "data" / "footprint_data.json" |
|
|
with open(json_path, 'r') as f: |
|
|
footprint_data = json.load(f) |
|
|
|
|
|
|
|
|
footprint_data.update({ |
|
|
"video_info": { |
|
|
"fps": analysis_params.get("video_fps"), |
|
|
"width": analysis_params.get("video_width"), |
|
|
"height": analysis_params.get("video_height"), |
|
|
"scale_length": analysis_params.get("scale_length"), |
|
|
"actual_length": analysis_params.get("actual_length"), |
|
|
} |
|
|
}) |
|
|
|
|
|
|
|
|
results_dir = Path("results") / task_id |
|
|
results_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
with open(results_dir / "analysis_result.json", 'w') as f: |
|
|
json.dump(footprint_data, f) |
|
|
|
|
|
|
|
|
if (result_dir / "visualization").exists(): |
|
|
shutil.copytree( |
|
|
result_dir / "visualization", |
|
|
results_dir / "visualization", |
|
|
dirs_exist_ok=True |
|
|
) |
|
|
|
|
|
logger.info(f"任务 {task_id} 分析完成") |
|
|
return AnalysisResponse( |
|
|
status="success", |
|
|
message="分析完成", |
|
|
data=footprint_data |
|
|
) |
|
|
|
|
|
finally: |
|
|
|
|
|
if temp_dir.exists(): |
|
|
shutil.rmtree(temp_dir) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"任务 {task_id} 失败: {str(e)}") |
|
|
return AnalysisResponse( |
|
|
status="error", |
|
|
message=f"分析失败: {str(e)}" |
|
|
) |
|
|
|
|
|
@app.get("/api/v1/health") |
|
|
async def health_check(): |
|
|
"""健康检查接口""" |
|
|
return {"status": "healthy"} |
|
|
|
|
|
def cleanup_old_results(): |
|
|
"""清理超过7天的旧结果""" |
|
|
try: |
|
|
results_dir = Path("results") |
|
|
if not results_dir.exists(): |
|
|
return |
|
|
|
|
|
current_time = datetime.now() |
|
|
for task_dir in results_dir.iterdir(): |
|
|
if not task_dir.is_dir(): |
|
|
continue |
|
|
|
|
|
|
|
|
try: |
|
|
dir_time = datetime.strptime(task_dir.name, "%Y%m%d_%H%M%S") |
|
|
if (current_time - dir_time).days > 1: |
|
|
shutil.rmtree(task_dir) |
|
|
logger.info(f"清理旧结果: {task_dir}") |
|
|
except ValueError: |
|
|
continue |
|
|
except Exception as e: |
|
|
logger.error(f"清理旧结果时出错: {str(e)}") |
|
|
|
|
|
@app.post("/api/v1/analyzeGaitPressure") |
|
|
async def analyze_gait_pressure( |
|
|
video: UploadFile = File(...), |
|
|
footprints: str = Form(...), |
|
|
record_params: str = Form(...) |
|
|
): |
|
|
"""处理步态压力分析请求""" |
|
|
task_id = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
logger.info(f"开始新的步态压力分析任务 {task_id}") |
|
|
|
|
|
try: |
|
|
|
|
|
temp_dir = Path("temp") / task_id |
|
|
temp_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
try: |
|
|
|
|
|
video_path = temp_dir / "input_video.mp4" |
|
|
try: |
|
|
with open(video_path, "wb") as buffer: |
|
|
while content := await video.read(8192): |
|
|
buffer.write(content) |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=400, detail=f"视频文件保存失败: {str(e)}") |
|
|
|
|
|
|
|
|
json_path = temp_dir / "footprint_data.json" |
|
|
try: |
|
|
footprint_data = json.loads(footprints) |
|
|
with open(json_path, 'w', encoding='utf-8') as f: |
|
|
json.dump(footprint_data, f, ensure_ascii=False) |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=400, detail=f"足印数据解析失败: {str(e)}") |
|
|
|
|
|
|
|
|
try: |
|
|
footprint_data = json.loads(footprints) |
|
|
|
|
|
body_keypoints = footprint_data.get('bodyKeypoints', []) |
|
|
logger.info(f"提取到 {len(body_keypoints)} 个关键点数据") |
|
|
|
|
|
|
|
|
params_dict = json.loads(record_params) |
|
|
params_dict['bodyKeypoints'] = body_keypoints |
|
|
logger.info(f"任务 {task_id} 记录参数: {params_dict}") |
|
|
|
|
|
except json.JSONDecodeError as e: |
|
|
raise HTTPException(status_code=400, detail=f"数据解析错误: {str(e)}") |
|
|
|
|
|
|
|
|
analyzer = GaitAnalysisReport( |
|
|
str(json_path), |
|
|
pre_delay_ms=0, |
|
|
post_delay_ms=0, |
|
|
video_path=str(video_path), |
|
|
record_params=params_dict |
|
|
) |
|
|
|
|
|
|
|
|
logger.info(f"任务 {task_id} 开始分析") |
|
|
stats = analyzer.analyze_stance_time() |
|
|
analyzer.analyze_gait_sequence() |
|
|
analyzer.analyze_pressure_timeline() |
|
|
|
|
|
|
|
|
logger.info(f"任务 {task_id} 开始生成足印面积时序图") |
|
|
analyzer.analyze_area_timeline() |
|
|
|
|
|
|
|
|
logger.info(f"任务 {task_id} 开始生成角速度分析") |
|
|
angular_velocity_data = analyzer.analyze_angular_velocity() |
|
|
|
|
|
logger.info(f"任务 {task_id} 开始生成速度分析") |
|
|
velocity_data = analyzer.analyze_velocity_timeline() |
|
|
|
|
|
logger.info(f"任务 {task_id} 开始生成尾根点移动分析") |
|
|
tail_movement_data = analyzer.analyze_tail_lateral_movement() |
|
|
|
|
|
logger.info(f"任务 {task_id} 开始生成支撑-摇摆相位分析") |
|
|
support_swing_data = analyzer.analyze_support_swing_phase() |
|
|
|
|
|
logger.info(f"任务 {task_id} 开始生成肢体占空比分析") |
|
|
duty_cycle_data = analyzer.analyze_limb_duty_cycle() |
|
|
|
|
|
|
|
|
logger.info(f"任务 {task_id} 开始生成3D足印可视化") |
|
|
footprint_3d_data = analyzer.generate_3d_footprint_analysis() |
|
|
|
|
|
|
|
|
index_html_path = os.path.join(analyzer.result_dir, 'plots', 'interactive_3d', 'index.html') |
|
|
if os.path.exists(index_html_path): |
|
|
try: |
|
|
with open(index_html_path, 'rb') as f: |
|
|
index_html = f.read() |
|
|
index_base64 = base64.b64encode(index_html).decode('utf-8') |
|
|
footprint_3d_data['index'] = { |
|
|
'html_base64': index_base64, |
|
|
'filename': 'index.html' |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"读取3D索引页面失败: {str(e)}") |
|
|
|
|
|
|
|
|
logger.info(f"任务 {task_id} 开始生成足印步行图") |
|
|
footprint_timeline_data = analyzer.generate_footprint_timeline() |
|
|
|
|
|
|
|
|
detailed_data = analyzer.generate_detailed_table() |
|
|
|
|
|
|
|
|
result_dir = Path(analyzer.result_dir) |
|
|
|
|
|
|
|
|
def get_image_base64(image_path): |
|
|
try: |
|
|
with open(image_path, "rb") as img_file: |
|
|
return base64.b64encode(img_file.read()).decode('utf-8') |
|
|
except Exception as e: |
|
|
logger.error(f"图片转换失败 {image_path}: {str(e)}") |
|
|
return None |
|
|
|
|
|
|
|
|
plots_dir = result_dir / "plots" |
|
|
plots_data = { |
|
|
"gait_sequence": get_image_base64(plots_dir / "gait_sequence.png"), |
|
|
"pressure_timeline": get_image_base64(plots_dir / "pressure_timeline.png"), |
|
|
"stance_timeline": get_image_base64(plots_dir / "stance_timeline.png"), |
|
|
"angular_velocity_timeline": get_image_base64(plots_dir / "angular_velocity_timeline.png"), |
|
|
"velocity_timeline": get_image_base64(plots_dir / "velocity_timeline.png"), |
|
|
"tail_lateral_movement": get_image_base64(plots_dir / "tail_lateral_movement.png"), |
|
|
"support_swing_phase": get_image_base64(plots_dir / "support_swing_phase.png"), |
|
|
"limb_duty_cycle": get_image_base64(plots_dir / "limb_duty_cycle.png"), |
|
|
"area_timeline": get_image_base64(plots_dir / "area_timeline.png") |
|
|
} |
|
|
|
|
|
|
|
|
response_data = { |
|
|
"task_id": task_id, |
|
|
"detailed_data": { |
|
|
"tables": { |
|
|
"base_footprint_data": detailed_data.get("base_footprint_data", []), |
|
|
"movement_direction_data": detailed_data.get("movement_direction_data", []), |
|
|
"collection_table": analyzer.generate_collection_table(), |
|
|
}, |
|
|
"plots": { |
|
|
"gait_sequence": { |
|
|
"data": plots_data["gait_sequence"], |
|
|
"type": "image/png" |
|
|
}, |
|
|
"pressure_timeline": { |
|
|
"data": plots_data["pressure_timeline"], |
|
|
"type": "image/png" |
|
|
}, |
|
|
"stance_timeline": { |
|
|
"data": plots_data["stance_timeline"], |
|
|
"type": "image/png" |
|
|
}, |
|
|
"angular_velocity_timeline": { |
|
|
"data": plots_data["angular_velocity_timeline"], |
|
|
"type": "image/png" |
|
|
}, |
|
|
"velocity_timeline": { |
|
|
"data": plots_data["velocity_timeline"], |
|
|
"type": "image/png" |
|
|
}, |
|
|
"tail_lateral_movement": { |
|
|
"data": plots_data["tail_lateral_movement"], |
|
|
"type": "image/png" |
|
|
}, |
|
|
"support_swing_phase": { |
|
|
"data": plots_data["support_swing_phase"], |
|
|
"type": "image/png" |
|
|
}, |
|
|
"limb_duty_cycle": { |
|
|
"data": plots_data["limb_duty_cycle"], |
|
|
"type": "image/png" |
|
|
}, |
|
|
"area_timeline": { |
|
|
"data": plots_data["area_timeline"], |
|
|
"type": "image/png" |
|
|
} |
|
|
}, |
|
|
"3d_visualization": footprint_3d_data, |
|
|
"footprint_timeline": footprint_timeline_data |
|
|
} |
|
|
} |
|
|
|
|
|
paw_stats = analyzer.generate_paw_statistics() |
|
|
response_data["detailed_data"]["tables"]["paw_statistics"] = paw_stats |
|
|
|
|
|
sequence_stats = analyzer.generate_step_sequence_table() |
|
|
response_data["detailed_data"]["tables"]["step_sequence"] = sequence_stats |
|
|
|
|
|
foot_spacing_stats = analyzer.generate_foot_spacing_table() |
|
|
response_data["detailed_data"]["tables"]["foot_spacing"] = foot_spacing_stats |
|
|
|
|
|
support_stats = analyzer.generate_support_table() |
|
|
response_data["detailed_data"]["tables"]["support"] = support_stats |
|
|
|
|
|
coordination_stats = analyzer.generate_coordination_table() |
|
|
response_data["detailed_data"]["tables"]["coordination"] = coordination_stats |
|
|
|
|
|
logger.info(f"任务 {task_id} 分析完成") |
|
|
return AnalysisResponse( |
|
|
status="success", |
|
|
message="分析完成", |
|
|
data=response_data |
|
|
) |
|
|
|
|
|
finally: |
|
|
|
|
|
if temp_dir.exists(): |
|
|
shutil.rmtree(temp_dir) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"任务 {task_id} 失败: {str(e)}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
def main(): |
|
|
"""启动服务器""" |
|
|
|
|
|
Path("temp").mkdir(exist_ok=True) |
|
|
Path("results").mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
cleanup_old_results() |
|
|
|
|
|
|
|
|
uvicorn.run( |
|
|
"api_server:app", |
|
|
host="0.0.0.0", |
|
|
port=54321, |
|
|
reload=True |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |