tech_challenge_fase_4_backend / app /mediapipe_processor.py
CLMARRARA's picture
tratamento do cv2.destroyAllWindows() para local (no HF funciona)
03ea674
from concurrent.futures import process
import gc
import os
import cv2
import numpy as np
from app.llm import ensure_model_face_exists
import mediapipe as mp
from mediapipe.tasks.python import vision
from mediapipe.tasks.python import BaseOptions
from app.config import FRAME_SKIP_FACE_SIGNALS, FACE_MODEL_PATH
from app.logger import log
import psutil, os
processo = psutil.Process(os.getpid())
def verifify_memory_usage(message, logger=None):
log(f"{message}: {processo.memory_info().rss / 1024 ** 2:.2f} MB", level="debug", logger=logger)
def extract_face_signals_facelandmarker(video_path, logger=None):
log(f"Iniciando extração de sinais faciais para {video_path}", logger=logger)
if not video_path or not os.path.isfile(video_path):
msg = f"Arquivo de vídeo não encontrado: {video_path}"
log(msg, level="error", logger=logger)
raise FileNotFoundError(msg)
ensure_model_face_exists(logger=logger)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
msg = f"Erro: não foi possível abrir o vídeo: {video_path}"
log(msg, level="error", logger=logger)
raise FileNotFoundError(msg)
# Valida se existe ao menos 1 frame legível no arquivo.
has_frame, first_frame = cap.read()
if not has_frame or first_frame is None:
cap.release()
msg = (
"O arquivo foi aberto, mas não contém frames válidos "
f"(vazio/corrompido): {video_path}"
)
log(msg, level="error", logger=logger)
raise ValueError(msg)
# Reinicia para processar desde o início.
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
eye_ratios = []
head_tilts = []
eye_deltas = []
tilt_deltas = []
prev_eye = None
prev_tilt = None
frame_count = 0
frames_lidos = 0
# 🔧 Configurar detector
options = vision.FaceLandmarkerOptions(
base_options=BaseOptions(
model_asset_path=str(FACE_MODEL_PATH),
delegate=BaseOptions.Delegate.CPU, # para garantir compatibilidade ampla com CPU
),
#output_face_blendshapes=False, # para focar apenas em landmarks
output_face_blendshapes=True, # para obter informações de expressões faciais
output_facial_transformation_matrixes=False,
running_mode=vision.RunningMode.VIDEO, # para otimizar para vídeo (manter estado entre frames)
#running_mode=vision.RunningMode.IMAGE, # para evitar problemas de estado entre frames, já que estamos processando de forma mais esparsa e não necessariamente em sequência
num_faces=1 # para focar apenas em um rosto (o mais proeminente), o que é comum em vídeos de pacientes
)
detector = vision.FaceLandmarker.create_from_options(options)
try:
while True:
ret, frame = cap.read()
if not ret:
break
frames_lidos += 1
if frame_count % FRAME_SKIP_FACE_SIGNALS != 0:
frame_count += 1
continue
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# 🔄 Converter para formato MediaPipe
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb)
# 🔥 alternativa mais precisa de timestamp usando FPS real do vídeo
fps = cap.get(cv2.CAP_PROP_FPS) or 30
timestamp = int(frame_count * (1000 / fps))
try:
log(f"Processando frame {frame_count}", logger=logger)
result = detector.detect(mp_image, timestamp) # usando timestamp real para melhorar a precisão temporal, especialmente se o vídeo tiver variações de FPS
#result = detector.detect(mp_image) # usando detect sem timestamp para evitar problemas de sincronização, já que estamos processando de forma mais esparsa e não necessariamente em sequência
log(f"Frame {frame_count} processado com sucesso", logger=logger)
except Exception as e:
log(f"Erro ao processar frame {frame_count} (timestamp {timestamp}ms): {e}", level="error", logger=logger)
break
if result.face_landmarks:
landmarks = result.face_landmarks[0]
# 👁️ Eye ratio (simples)
# landmarks são objetos com .x .y .z
eye_ratio = abs(landmarks[159].y - landmarks[145].y)
eye_ratios.append(eye_ratio)
# 🧠 Head tilt (diferença entre olhos)
tilt = abs(landmarks[33].y - landmarks[263].y)
head_tilts.append(tilt)
if prev_eye is not None:
eye_deltas.append(abs(eye_ratio - prev_eye))
if prev_tilt is not None:
tilt_deltas.append(abs(tilt - prev_tilt))
prev_eye = eye_ratio
prev_tilt = tilt
frame_count += 1
finally:
cap.release()
detector.close()
del detector
gc.collect()
try:
cv2.destroyAllWindows()
except:
pass
# 🔥 valida leitura real
if frames_lidos == 0:
msg = f"Erro: nenhum frame foi lido do vídeo: {video_path}"
log(msg, level="error", logger=logger)
raise ValueError(msg)
# 📊 MÉDIAS (como você já tinha)
avg_eye = sum(eye_ratios)/len(eye_ratios) if eye_ratios else 0
avg_tilt = sum(head_tilts)/len(head_tilts) if head_tilts else 0
# 🔥 MOVIMENTO MÉDIO
movement_eye = sum(eye_deltas)/len(eye_deltas) if eye_deltas else 0
movement_tilt = sum(tilt_deltas)/len(tilt_deltas) if tilt_deltas else 0
# 🔥 VARIÂNCIA (DESVIO PADRÃO)
eye_std = np.std(eye_ratios) if eye_ratios else 0
tilt_std = np.std(head_tilts) if head_tilts else 0
# 🔥 SCORE FINAL DE AGITAÇÃO
agitation_score = movement_eye + movement_tilt + eye_std + tilt_std
log(f"Eye ratio médio: {avg_eye}", logger=logger)
log(f"Inclinação cabeça: {avg_tilt}", logger=logger)
log(f"Movimento olhos: {movement_eye}", logger=logger)
log(f"Movimento cabeça: {movement_tilt}", logger=logger)
log(f"Desvio padrão olhos: {eye_std}", logger=logger)
log(f"Desvio padrão cabeça: {tilt_std}", logger=logger)
log(f"Score de agitação: {agitation_score}", logger=logger)
log("Extração de sinais faciais concluída", logger=logger)
return {
"eye_ratio": avg_eye,
"head_tilt": avg_tilt,
"movement_eye": movement_eye,
"movement_tilt": movement_tilt,
"eye_std": eye_std,
"tilt_std": tilt_std,
"agitation_score": agitation_score
}
def extract_face_signals_facemesh(video_path, logger=None):
mp_face_mesh = mp.solutions.face_mesh
log(f"[FaceMesh] Iniciando extração de sinais faciais para {video_path}", logger=logger)
if not video_path or not os.path.isfile(video_path):
msg = f"Arquivo de vídeo não encontrado: {video_path}"
log(msg, level="error", logger=logger)
raise FileNotFoundError(msg)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
msg = f"Erro: não foi possível abrir o vídeo: {video_path}"
log(msg, level="error", logger=logger)
raise FileNotFoundError(msg)
has_frame, first_frame = cap.read()
if not has_frame or first_frame is None:
cap.release()
msg = f"Arquivo inválido ou sem frames: {video_path}"
log(msg, level="error", logger=logger)
raise ValueError(msg)
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
eye_ratios = []
head_tilts = []
eye_deltas = []
tilt_deltas = []
prev_eye = None
prev_tilt = None
frame_count = 0
frames_lidos = 0
# 🔥 FaceMesh (leve e estável)
face_mesh = mp_face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=1,
refine_landmarks=False, # 🔥 manter leve
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
try:
while True:
ret, frame = cap.read()
if not ret:
break
frames_lidos += 1
if frame_count % FRAME_SKIP_FACE_SIGNALS != 0:
frame_count += 1
continue
# 🔥 reduzir resolução (opcional, recomendado)
frame = cv2.resize(frame, (320, 240))
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
try:
log(f"[FaceMesh] Processando frame {frame_count}", logger=logger)
results = face_mesh.process(rgb)
except Exception as e:
log(f"[FaceMesh] Erro no frame {frame_count}: {e}", level="error", logger=logger)
break
if results.multi_face_landmarks:
landmarks = results.multi_face_landmarks[0].landmark
# 👁️ Eye ratio (mesma lógica adaptada)
eye_ratio = abs(landmarks[159].y - landmarks[145].y)
eye_ratios.append(eye_ratio)
# 🧠 Head tilt
tilt = abs(landmarks[33].y - landmarks[263].y)
head_tilts.append(tilt)
if prev_eye is not None:
eye_deltas.append(abs(eye_ratio - prev_eye))
if prev_tilt is not None:
tilt_deltas.append(abs(tilt - prev_tilt))
prev_eye = eye_ratio
prev_tilt = tilt
frame_count += 1
finally:
cap.release()
face_mesh.close()
gc.collect()
try:
cv2.destroyAllWindows()
except:
pass
if frames_lidos == 0:
msg = f"Nenhum frame lido do vídeo: {video_path}"
log(msg, level="error", logger=logger)
raise ValueError(msg)
# 📊 MÉTRICAS (mantidas iguais)
avg_eye = sum(eye_ratios)/len(eye_ratios) if eye_ratios else 0
avg_tilt = sum(head_tilts)/len(head_tilts) if head_tilts else 0
movement_eye = sum(eye_deltas)/len(eye_deltas) if eye_deltas else 0
movement_tilt = sum(tilt_deltas)/len(tilt_deltas) if tilt_deltas else 0
eye_std = np.std(eye_ratios) if eye_ratios else 0
tilt_std = np.std(head_tilts) if head_tilts else 0
agitation_score = movement_eye + movement_tilt + eye_std + tilt_std
log(f"[FaceMesh] Eye ratio médio: {avg_eye}", logger=logger)
log(f"[FaceMesh] Inclinação cabeça: {avg_tilt}", logger=logger)
log(f"[FaceMesh] Movimento olhos: {movement_eye}", logger=logger)
log(f"[FaceMesh] Movimento cabeça: {movement_tilt}", logger=logger)
log(f"[FaceMesh] Desvio padrão olhos: {eye_std}", logger=logger)
log(f"[FaceMesh] Desvio padrão cabeça: {tilt_std}", logger=logger)
log(f"[FaceMesh] Score de agitação: {agitation_score}", logger=logger)
log("[FaceMesh] Extração concluída", logger=logger)
return {
"eye_ratio": avg_eye,
"head_tilt": avg_tilt,
"movement_eye": movement_eye,
"movement_tilt": movement_tilt,
"eye_std": eye_std,
"tilt_std": tilt_std,
"agitation_score": agitation_score
}