File size: 2,657 Bytes
09ebca6
 
1a64f28
95ba2d8
1a64f28
4a3c584
 
68ee00e
95ba2d8
1a64f28
4ff5df1
 
 
4a3c584
1a64f28
4ff5df1
1a64f28
 
 
 
 
560c910
 
 
 
 
 
 
 
 
 
 
4a3c584
 
560c910
4a3c584
 
 
 
1a64f28
4ff5df1
 
4a3c584
 
 
 
1a64f28
68ee00e
4a3c584
 
1a64f28
4a3c584
 
 
 
1a64f28
4a3c584
1a64f28
4a3c584
 
1a64f28
4a3c584
 
1a64f28
4a3c584
 
 
1a64f28
4a3c584
 
 
1a64f28
 
09ebca6
 
 
03ea674
 
 
 
1a64f28
 
 
4ff5df1
 
1a64f28
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import gc

import cv2
from app.llm import ensure_model_pose_exists
import mediapipe as mp
from mediapipe.tasks.python import vision
from mediapipe.tasks.python import BaseOptions
from app.config import FRAME_SKIP_POSE, POSE_MODEL_PATH
from app.logger import log

def extract_body_signals(video_path, logger=None):
    log(f"Extraindo sinais corporais de {video_path}...", logger=logger)
    ensure_model_pose_exists(logger=logger)

    cap = cv2.VideoCapture(video_path)
    log("Vídeo carregado com sucesso.", logger=logger)

    movements = []
    prev_wrist = None
    frame_count = 0

    # Essa parte é necessária para contornar um problema do MediaPipe que não aceita modelos carregados diretamente do disco.
    # Erro:Unable to open file at C:\Pos_IA\hf_backend\.venv-311\Lib\site-packages/C:\Pos_IA\hf_backend\models\pose_landmarker.task, errno=22
    #options = vision.PoseLandmarkerOptions(
    #    base_options=BaseOptions(model_asset_path=str(POSE_MODEL_PATH)),
    #    num_poses=1
    #)

    log(f"Configurando detector de pose e carregando modelo em {POSE_MODEL_PATH}", logger=logger)
    with open(POSE_MODEL_PATH, "rb") as f:
        model_bytes = f.read()

    # 🔧 Configurar detector
    options = vision.PoseLandmarkerOptions(
        base_options=BaseOptions(model_asset_buffer=model_bytes),
        num_poses=1
    )

    detector = vision.PoseLandmarker.create_from_options(options)

    log("Detector de pose criado com sucesso.", logger=logger)

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if frame_count % FRAME_SKIP_POSE != 0:
            frame_count += 1
            continue

        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # converter para MediaPipe Image
        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb)

        result = detector.detect(mp_image)

        if result.pose_landmarks:
            lm = result.pose_landmarks[0]

            # RIGHT_WRIST = índice 16
            wrist = (lm[16].x, lm[16].y)

            if prev_wrist:
                dist = ((wrist[0] - prev_wrist[0])**2 + (wrist[1] - prev_wrist[1])**2)**0.5
                movements.append(dist)

            prev_wrist = wrist

        frame_count += 1

    cap.release()
    detector.close()
    del detector
    gc.collect()
    try:
        cv2.destroyAllWindows()
    except:
        pass

    avg_movement = sum(movements)/len(movements) if movements else 0

    log(f"Movimento corporal: {avg_movement}", logger=logger)
    log("Sinais corporais extraídos com sucesso.", logger=logger)

    return {
        "body_movement": avg_movement
    }