File size: 11,189 Bytes
0d77f94 09ebca6 8c70a56 1a64f28 8c70a56 95ba2d8 1a64f28 4a3c584 68ee00e 95ba2d8 0d77f94 1a64f28 90a9f80 4ff5df1 8c70a56 4ff5df1 0d77f94 1a64f28 8c70a56 1a64f28 8c70a56 1a64f28 8c70a56 1a64f28 4a3c584 bd47a3e 45d9143 bd47a3e 5999a40 4a3c584 3868ea9 8a59194 4a3c584 8c70a56 4a3c584 68ee00e 8c70a56 96c978d 9238041 50cb468 3868ea9 50cb468 9238041 1a64f28 8c70a56 1a64f28 8c70a56 1a64f28 8c70a56 1a64f28 8c70a56 1a64f28 8c70a56 1a64f28 8c70a56 1a64f28 8c70a56 09ebca6 03ea674 1a64f28 8c70a56 1a64f28 8c70a56 1a64f28 8c70a56 1a64f28 90a9f80 03ea674 90a9f80 1a64f28 8c70a56 1a64f28 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 | from concurrent.futures import process
import gc
import os
import cv2
import numpy as np
from app.llm import ensure_model_face_exists
import mediapipe as mp
from mediapipe.tasks.python import vision
from mediapipe.tasks.python import BaseOptions
from app.config import FRAME_SKIP_FACE_SIGNALS, FACE_MODEL_PATH
from app.logger import log
import psutil, os
processo = psutil.Process(os.getpid())
def verifify_memory_usage(message, logger=None):
log(f"{message}: {processo.memory_info().rss / 1024 ** 2:.2f} MB", level="debug", logger=logger)
def extract_face_signals_facelandmarker(video_path, logger=None):
log(f"Iniciando extração de sinais faciais para {video_path}", logger=logger)
if not video_path or not os.path.isfile(video_path):
msg = f"Arquivo de vídeo não encontrado: {video_path}"
log(msg, level="error", logger=logger)
raise FileNotFoundError(msg)
ensure_model_face_exists(logger=logger)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
msg = f"Erro: não foi possível abrir o vídeo: {video_path}"
log(msg, level="error", logger=logger)
raise FileNotFoundError(msg)
# Valida se existe ao menos 1 frame legível no arquivo.
has_frame, first_frame = cap.read()
if not has_frame or first_frame is None:
cap.release()
msg = (
"O arquivo foi aberto, mas não contém frames válidos "
f"(vazio/corrompido): {video_path}"
)
log(msg, level="error", logger=logger)
raise ValueError(msg)
# Reinicia para processar desde o início.
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
eye_ratios = []
head_tilts = []
eye_deltas = []
tilt_deltas = []
prev_eye = None
prev_tilt = None
frame_count = 0
frames_lidos = 0
# 🔧 Configurar detector
options = vision.FaceLandmarkerOptions(
base_options=BaseOptions(
model_asset_path=str(FACE_MODEL_PATH),
delegate=BaseOptions.Delegate.CPU, # para garantir compatibilidade ampla com CPU
),
#output_face_blendshapes=False, # para focar apenas em landmarks
output_face_blendshapes=True, # para obter informações de expressões faciais
output_facial_transformation_matrixes=False,
running_mode=vision.RunningMode.VIDEO, # para otimizar para vídeo (manter estado entre frames)
#running_mode=vision.RunningMode.IMAGE, # para evitar problemas de estado entre frames, já que estamos processando de forma mais esparsa e não necessariamente em sequência
num_faces=1 # para focar apenas em um rosto (o mais proeminente), o que é comum em vídeos de pacientes
)
detector = vision.FaceLandmarker.create_from_options(options)
try:
while True:
ret, frame = cap.read()
if not ret:
break
frames_lidos += 1
if frame_count % FRAME_SKIP_FACE_SIGNALS != 0:
frame_count += 1
continue
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# 🔄 Converter para formato MediaPipe
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb)
# 🔥 alternativa mais precisa de timestamp usando FPS real do vídeo
fps = cap.get(cv2.CAP_PROP_FPS) or 30
timestamp = int(frame_count * (1000 / fps))
try:
log(f"Processando frame {frame_count}", logger=logger)
result = detector.detect(mp_image, timestamp) # usando timestamp real para melhorar a precisão temporal, especialmente se o vídeo tiver variações de FPS
#result = detector.detect(mp_image) # usando detect sem timestamp para evitar problemas de sincronização, já que estamos processando de forma mais esparsa e não necessariamente em sequência
log(f"Frame {frame_count} processado com sucesso", logger=logger)
except Exception as e:
log(f"Erro ao processar frame {frame_count} (timestamp {timestamp}ms): {e}", level="error", logger=logger)
break
if result.face_landmarks:
landmarks = result.face_landmarks[0]
# 👁️ Eye ratio (simples)
# landmarks são objetos com .x .y .z
eye_ratio = abs(landmarks[159].y - landmarks[145].y)
eye_ratios.append(eye_ratio)
# 🧠 Head tilt (diferença entre olhos)
tilt = abs(landmarks[33].y - landmarks[263].y)
head_tilts.append(tilt)
if prev_eye is not None:
eye_deltas.append(abs(eye_ratio - prev_eye))
if prev_tilt is not None:
tilt_deltas.append(abs(tilt - prev_tilt))
prev_eye = eye_ratio
prev_tilt = tilt
frame_count += 1
finally:
cap.release()
detector.close()
del detector
gc.collect()
try:
cv2.destroyAllWindows()
except:
pass
# 🔥 valida leitura real
if frames_lidos == 0:
msg = f"Erro: nenhum frame foi lido do vídeo: {video_path}"
log(msg, level="error", logger=logger)
raise ValueError(msg)
# 📊 MÉDIAS (como você já tinha)
avg_eye = sum(eye_ratios)/len(eye_ratios) if eye_ratios else 0
avg_tilt = sum(head_tilts)/len(head_tilts) if head_tilts else 0
# 🔥 MOVIMENTO MÉDIO
movement_eye = sum(eye_deltas)/len(eye_deltas) if eye_deltas else 0
movement_tilt = sum(tilt_deltas)/len(tilt_deltas) if tilt_deltas else 0
# 🔥 VARIÂNCIA (DESVIO PADRÃO)
eye_std = np.std(eye_ratios) if eye_ratios else 0
tilt_std = np.std(head_tilts) if head_tilts else 0
# 🔥 SCORE FINAL DE AGITAÇÃO
agitation_score = movement_eye + movement_tilt + eye_std + tilt_std
log(f"Eye ratio médio: {avg_eye}", logger=logger)
log(f"Inclinação cabeça: {avg_tilt}", logger=logger)
log(f"Movimento olhos: {movement_eye}", logger=logger)
log(f"Movimento cabeça: {movement_tilt}", logger=logger)
log(f"Desvio padrão olhos: {eye_std}", logger=logger)
log(f"Desvio padrão cabeça: {tilt_std}", logger=logger)
log(f"Score de agitação: {agitation_score}", logger=logger)
log("Extração de sinais faciais concluída", logger=logger)
return {
"eye_ratio": avg_eye,
"head_tilt": avg_tilt,
"movement_eye": movement_eye,
"movement_tilt": movement_tilt,
"eye_std": eye_std,
"tilt_std": tilt_std,
"agitation_score": agitation_score
}
def extract_face_signals_facemesh(video_path, logger=None):
mp_face_mesh = mp.solutions.face_mesh
log(f"[FaceMesh] Iniciando extração de sinais faciais para {video_path}", logger=logger)
if not video_path or not os.path.isfile(video_path):
msg = f"Arquivo de vídeo não encontrado: {video_path}"
log(msg, level="error", logger=logger)
raise FileNotFoundError(msg)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
msg = f"Erro: não foi possível abrir o vídeo: {video_path}"
log(msg, level="error", logger=logger)
raise FileNotFoundError(msg)
has_frame, first_frame = cap.read()
if not has_frame or first_frame is None:
cap.release()
msg = f"Arquivo inválido ou sem frames: {video_path}"
log(msg, level="error", logger=logger)
raise ValueError(msg)
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
eye_ratios = []
head_tilts = []
eye_deltas = []
tilt_deltas = []
prev_eye = None
prev_tilt = None
frame_count = 0
frames_lidos = 0
# 🔥 FaceMesh (leve e estável)
face_mesh = mp_face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=1,
refine_landmarks=False, # 🔥 manter leve
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
try:
while True:
ret, frame = cap.read()
if not ret:
break
frames_lidos += 1
if frame_count % FRAME_SKIP_FACE_SIGNALS != 0:
frame_count += 1
continue
# 🔥 reduzir resolução (opcional, recomendado)
frame = cv2.resize(frame, (320, 240))
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
try:
log(f"[FaceMesh] Processando frame {frame_count}", logger=logger)
results = face_mesh.process(rgb)
except Exception as e:
log(f"[FaceMesh] Erro no frame {frame_count}: {e}", level="error", logger=logger)
break
if results.multi_face_landmarks:
landmarks = results.multi_face_landmarks[0].landmark
# 👁️ Eye ratio (mesma lógica adaptada)
eye_ratio = abs(landmarks[159].y - landmarks[145].y)
eye_ratios.append(eye_ratio)
# 🧠 Head tilt
tilt = abs(landmarks[33].y - landmarks[263].y)
head_tilts.append(tilt)
if prev_eye is not None:
eye_deltas.append(abs(eye_ratio - prev_eye))
if prev_tilt is not None:
tilt_deltas.append(abs(tilt - prev_tilt))
prev_eye = eye_ratio
prev_tilt = tilt
frame_count += 1
finally:
cap.release()
face_mesh.close()
gc.collect()
try:
cv2.destroyAllWindows()
except:
pass
if frames_lidos == 0:
msg = f"Nenhum frame lido do vídeo: {video_path}"
log(msg, level="error", logger=logger)
raise ValueError(msg)
# 📊 MÉTRICAS (mantidas iguais)
avg_eye = sum(eye_ratios)/len(eye_ratios) if eye_ratios else 0
avg_tilt = sum(head_tilts)/len(head_tilts) if head_tilts else 0
movement_eye = sum(eye_deltas)/len(eye_deltas) if eye_deltas else 0
movement_tilt = sum(tilt_deltas)/len(tilt_deltas) if tilt_deltas else 0
eye_std = np.std(eye_ratios) if eye_ratios else 0
tilt_std = np.std(head_tilts) if head_tilts else 0
agitation_score = movement_eye + movement_tilt + eye_std + tilt_std
log(f"[FaceMesh] Eye ratio médio: {avg_eye}", logger=logger)
log(f"[FaceMesh] Inclinação cabeça: {avg_tilt}", logger=logger)
log(f"[FaceMesh] Movimento olhos: {movement_eye}", logger=logger)
log(f"[FaceMesh] Movimento cabeça: {movement_tilt}", logger=logger)
log(f"[FaceMesh] Desvio padrão olhos: {eye_std}", logger=logger)
log(f"[FaceMesh] Desvio padrão cabeça: {tilt_std}", logger=logger)
log(f"[FaceMesh] Score de agitação: {agitation_score}", logger=logger)
log("[FaceMesh] Extração concluída", logger=logger)
return {
"eye_ratio": avg_eye,
"head_tilt": avg_tilt,
"movement_eye": movement_eye,
"movement_tilt": movement_tilt,
"eye_std": eye_std,
"tilt_std": tilt_std,
"agitation_score": agitation_score
} |