tech_challenge_fase_4_backend / app /pose_processor.py
CLMARRARA's picture
Alterado a maneira de carregar o modelo PoseLandmarker
560c910
import gc
import cv2
from app.llm import ensure_model_pose_exists
import mediapipe as mp
from mediapipe.tasks.python import vision
from mediapipe.tasks.python import BaseOptions
from app.config import FRAME_SKIP_POSE, POSE_MODEL_PATH
from app.logger import log
def extract_body_signals(video_path, logger=None):
log(f"Extraindo sinais corporais de {video_path}...", logger=logger)
ensure_model_pose_exists(logger=logger)
cap = cv2.VideoCapture(video_path)
log("Vídeo carregado com sucesso.", logger=logger)
movements = []
prev_wrist = None
frame_count = 0
# Essa parte é necessária para contornar um problema do MediaPipe que não aceita modelos carregados diretamente do disco.
# Erro:Unable to open file at C:\Pos_IA\hf_backend\.venv-311\Lib\site-packages/C:\Pos_IA\hf_backend\models\pose_landmarker.task, errno=22
#options = vision.PoseLandmarkerOptions(
# base_options=BaseOptions(model_asset_path=str(POSE_MODEL_PATH)),
# num_poses=1
#)
log(f"Configurando detector de pose e carregando modelo em {POSE_MODEL_PATH}", logger=logger)
with open(POSE_MODEL_PATH, "rb") as f:
model_bytes = f.read()
# 🔧 Configurar detector
options = vision.PoseLandmarkerOptions(
base_options=BaseOptions(model_asset_buffer=model_bytes),
num_poses=1
)
detector = vision.PoseLandmarker.create_from_options(options)
log("Detector de pose criado com sucesso.", logger=logger)
while True:
ret, frame = cap.read()
if not ret:
break
if frame_count % FRAME_SKIP_POSE != 0:
frame_count += 1
continue
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# converter para MediaPipe Image
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb)
result = detector.detect(mp_image)
if result.pose_landmarks:
lm = result.pose_landmarks[0]
# RIGHT_WRIST = índice 16
wrist = (lm[16].x, lm[16].y)
if prev_wrist:
dist = ((wrist[0] - prev_wrist[0])**2 + (wrist[1] - prev_wrist[1])**2)**0.5
movements.append(dist)
prev_wrist = wrist
frame_count += 1
cap.release()
detector.close()
del detector
gc.collect()
try:
cv2.destroyAllWindows()
except:
pass
avg_movement = sum(movements)/len(movements) if movements else 0
log(f"Movimento corporal: {avg_movement}", logger=logger)
log("Sinais corporais extraídos com sucesso.", logger=logger)
return {
"body_movement": avg_movement
}