from concurrent.futures import process import gc import os import cv2 import numpy as np from app.llm import ensure_model_face_exists import mediapipe as mp from mediapipe.tasks.python import vision from mediapipe.tasks.python import BaseOptions from app.config import FRAME_SKIP_FACE_SIGNALS, FACE_MODEL_PATH from app.logger import log import psutil, os processo = psutil.Process(os.getpid()) def verifify_memory_usage(message, logger=None): log(f"{message}: {processo.memory_info().rss / 1024 ** 2:.2f} MB", level="debug", logger=logger) def extract_face_signals_facelandmarker(video_path, logger=None): log(f"Iniciando extração de sinais faciais para {video_path}", logger=logger) if not video_path or not os.path.isfile(video_path): msg = f"Arquivo de vídeo não encontrado: {video_path}" log(msg, level="error", logger=logger) raise FileNotFoundError(msg) ensure_model_face_exists(logger=logger) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): msg = f"Erro: não foi possível abrir o vídeo: {video_path}" log(msg, level="error", logger=logger) raise FileNotFoundError(msg) # Valida se existe ao menos 1 frame legível no arquivo. has_frame, first_frame = cap.read() if not has_frame or first_frame is None: cap.release() msg = ( "O arquivo foi aberto, mas não contém frames válidos " f"(vazio/corrompido): {video_path}" ) log(msg, level="error", logger=logger) raise ValueError(msg) # Reinicia para processar desde o início. cap.set(cv2.CAP_PROP_POS_FRAMES, 0) eye_ratios = [] head_tilts = [] eye_deltas = [] tilt_deltas = [] prev_eye = None prev_tilt = None frame_count = 0 frames_lidos = 0 # 🔧 Configurar detector options = vision.FaceLandmarkerOptions( base_options=BaseOptions( model_asset_path=str(FACE_MODEL_PATH), delegate=BaseOptions.Delegate.CPU, # para garantir compatibilidade ampla com CPU ), #output_face_blendshapes=False, # para focar apenas em landmarks output_face_blendshapes=True, # para obter informações de expressões faciais output_facial_transformation_matrixes=False, running_mode=vision.RunningMode.VIDEO, # para otimizar para vídeo (manter estado entre frames) #running_mode=vision.RunningMode.IMAGE, # para evitar problemas de estado entre frames, já que estamos processando de forma mais esparsa e não necessariamente em sequência num_faces=1 # para focar apenas em um rosto (o mais proeminente), o que é comum em vídeos de pacientes ) detector = vision.FaceLandmarker.create_from_options(options) try: while True: ret, frame = cap.read() if not ret: break frames_lidos += 1 if frame_count % FRAME_SKIP_FACE_SIGNALS != 0: frame_count += 1 continue rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # 🔄 Converter para formato MediaPipe mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb) # 🔥 alternativa mais precisa de timestamp usando FPS real do vídeo fps = cap.get(cv2.CAP_PROP_FPS) or 30 timestamp = int(frame_count * (1000 / fps)) try: log(f"Processando frame {frame_count}", logger=logger) result = detector.detect(mp_image, timestamp) # usando timestamp real para melhorar a precisão temporal, especialmente se o vídeo tiver variações de FPS #result = detector.detect(mp_image) # usando detect sem timestamp para evitar problemas de sincronização, já que estamos processando de forma mais esparsa e não necessariamente em sequência log(f"Frame {frame_count} processado com sucesso", logger=logger) except Exception as e: log(f"Erro ao processar frame {frame_count} (timestamp {timestamp}ms): {e}", level="error", logger=logger) break if result.face_landmarks: landmarks = result.face_landmarks[0] # 👁️ Eye ratio (simples) # landmarks são objetos com .x .y .z eye_ratio = abs(landmarks[159].y - landmarks[145].y) eye_ratios.append(eye_ratio) # 🧠 Head tilt (diferença entre olhos) tilt = abs(landmarks[33].y - landmarks[263].y) head_tilts.append(tilt) if prev_eye is not None: eye_deltas.append(abs(eye_ratio - prev_eye)) if prev_tilt is not None: tilt_deltas.append(abs(tilt - prev_tilt)) prev_eye = eye_ratio prev_tilt = tilt frame_count += 1 finally: cap.release() detector.close() del detector gc.collect() try: cv2.destroyAllWindows() except: pass # 🔥 valida leitura real if frames_lidos == 0: msg = f"Erro: nenhum frame foi lido do vídeo: {video_path}" log(msg, level="error", logger=logger) raise ValueError(msg) # 📊 MÉDIAS (como você já tinha) avg_eye = sum(eye_ratios)/len(eye_ratios) if eye_ratios else 0 avg_tilt = sum(head_tilts)/len(head_tilts) if head_tilts else 0 # 🔥 MOVIMENTO MÉDIO movement_eye = sum(eye_deltas)/len(eye_deltas) if eye_deltas else 0 movement_tilt = sum(tilt_deltas)/len(tilt_deltas) if tilt_deltas else 0 # 🔥 VARIÂNCIA (DESVIO PADRÃO) eye_std = np.std(eye_ratios) if eye_ratios else 0 tilt_std = np.std(head_tilts) if head_tilts else 0 # 🔥 SCORE FINAL DE AGITAÇÃO agitation_score = movement_eye + movement_tilt + eye_std + tilt_std log(f"Eye ratio médio: {avg_eye}", logger=logger) log(f"Inclinação cabeça: {avg_tilt}", logger=logger) log(f"Movimento olhos: {movement_eye}", logger=logger) log(f"Movimento cabeça: {movement_tilt}", logger=logger) log(f"Desvio padrão olhos: {eye_std}", logger=logger) log(f"Desvio padrão cabeça: {tilt_std}", logger=logger) log(f"Score de agitação: {agitation_score}", logger=logger) log("Extração de sinais faciais concluída", logger=logger) return { "eye_ratio": avg_eye, "head_tilt": avg_tilt, "movement_eye": movement_eye, "movement_tilt": movement_tilt, "eye_std": eye_std, "tilt_std": tilt_std, "agitation_score": agitation_score } def extract_face_signals_facemesh(video_path, logger=None): mp_face_mesh = mp.solutions.face_mesh log(f"[FaceMesh] Iniciando extração de sinais faciais para {video_path}", logger=logger) if not video_path or not os.path.isfile(video_path): msg = f"Arquivo de vídeo não encontrado: {video_path}" log(msg, level="error", logger=logger) raise FileNotFoundError(msg) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): msg = f"Erro: não foi possível abrir o vídeo: {video_path}" log(msg, level="error", logger=logger) raise FileNotFoundError(msg) has_frame, first_frame = cap.read() if not has_frame or first_frame is None: cap.release() msg = f"Arquivo inválido ou sem frames: {video_path}" log(msg, level="error", logger=logger) raise ValueError(msg) cap.set(cv2.CAP_PROP_POS_FRAMES, 0) eye_ratios = [] head_tilts = [] eye_deltas = [] tilt_deltas = [] prev_eye = None prev_tilt = None frame_count = 0 frames_lidos = 0 # 🔥 FaceMesh (leve e estável) face_mesh = mp_face_mesh.FaceMesh( static_image_mode=False, max_num_faces=1, refine_landmarks=False, # 🔥 manter leve min_detection_confidence=0.5, min_tracking_confidence=0.5 ) try: while True: ret, frame = cap.read() if not ret: break frames_lidos += 1 if frame_count % FRAME_SKIP_FACE_SIGNALS != 0: frame_count += 1 continue # 🔥 reduzir resolução (opcional, recomendado) frame = cv2.resize(frame, (320, 240)) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) try: log(f"[FaceMesh] Processando frame {frame_count}", logger=logger) results = face_mesh.process(rgb) except Exception as e: log(f"[FaceMesh] Erro no frame {frame_count}: {e}", level="error", logger=logger) break if results.multi_face_landmarks: landmarks = results.multi_face_landmarks[0].landmark # 👁️ Eye ratio (mesma lógica adaptada) eye_ratio = abs(landmarks[159].y - landmarks[145].y) eye_ratios.append(eye_ratio) # 🧠 Head tilt tilt = abs(landmarks[33].y - landmarks[263].y) head_tilts.append(tilt) if prev_eye is not None: eye_deltas.append(abs(eye_ratio - prev_eye)) if prev_tilt is not None: tilt_deltas.append(abs(tilt - prev_tilt)) prev_eye = eye_ratio prev_tilt = tilt frame_count += 1 finally: cap.release() face_mesh.close() gc.collect() try: cv2.destroyAllWindows() except: pass if frames_lidos == 0: msg = f"Nenhum frame lido do vídeo: {video_path}" log(msg, level="error", logger=logger) raise ValueError(msg) # 📊 MÉTRICAS (mantidas iguais) avg_eye = sum(eye_ratios)/len(eye_ratios) if eye_ratios else 0 avg_tilt = sum(head_tilts)/len(head_tilts) if head_tilts else 0 movement_eye = sum(eye_deltas)/len(eye_deltas) if eye_deltas else 0 movement_tilt = sum(tilt_deltas)/len(tilt_deltas) if tilt_deltas else 0 eye_std = np.std(eye_ratios) if eye_ratios else 0 tilt_std = np.std(head_tilts) if head_tilts else 0 agitation_score = movement_eye + movement_tilt + eye_std + tilt_std log(f"[FaceMesh] Eye ratio médio: {avg_eye}", logger=logger) log(f"[FaceMesh] Inclinação cabeça: {avg_tilt}", logger=logger) log(f"[FaceMesh] Movimento olhos: {movement_eye}", logger=logger) log(f"[FaceMesh] Movimento cabeça: {movement_tilt}", logger=logger) log(f"[FaceMesh] Desvio padrão olhos: {eye_std}", logger=logger) log(f"[FaceMesh] Desvio padrão cabeça: {tilt_std}", logger=logger) log(f"[FaceMesh] Score de agitação: {agitation_score}", logger=logger) log("[FaceMesh] Extração concluída", logger=logger) return { "eye_ratio": avg_eye, "head_tilt": avg_tilt, "movement_eye": movement_eye, "movement_tilt": movement_tilt, "eye_std": eye_std, "tilt_std": tilt_std, "agitation_score": agitation_score }