| from concurrent.futures import process |
| import gc |
| import os |
| import cv2 |
| import numpy as np |
| from app.llm import ensure_model_face_exists |
| import mediapipe as mp |
| from mediapipe.tasks.python import vision |
| from mediapipe.tasks.python import BaseOptions |
| from app.config import FRAME_SKIP_FACE_SIGNALS, FACE_MODEL_PATH |
| from app.logger import log |
| import psutil, os |
| processo = psutil.Process(os.getpid()) |
|
|
| def verifify_memory_usage(message, logger=None): |
| log(f"{message}: {processo.memory_info().rss / 1024 ** 2:.2f} MB", level="debug", logger=logger) |
|
|
| def extract_face_signals_facelandmarker(video_path, logger=None): |
| log(f"Iniciando extração de sinais faciais para {video_path}", logger=logger) |
|
|
| if not video_path or not os.path.isfile(video_path): |
| msg = f"Arquivo de vídeo não encontrado: {video_path}" |
| log(msg, level="error", logger=logger) |
| raise FileNotFoundError(msg) |
|
|
| ensure_model_face_exists(logger=logger) |
|
|
| cap = cv2.VideoCapture(video_path) |
|
|
| if not cap.isOpened(): |
| msg = f"Erro: não foi possível abrir o vídeo: {video_path}" |
| log(msg, level="error", logger=logger) |
| raise FileNotFoundError(msg) |
| |
| |
| has_frame, first_frame = cap.read() |
| if not has_frame or first_frame is None: |
| cap.release() |
| msg = ( |
| "O arquivo foi aberto, mas não contém frames válidos " |
| f"(vazio/corrompido): {video_path}" |
| ) |
| log(msg, level="error", logger=logger) |
| raise ValueError(msg) |
|
|
| |
| cap.set(cv2.CAP_PROP_POS_FRAMES, 0) |
|
|
| eye_ratios = [] |
| head_tilts = [] |
| eye_deltas = [] |
| tilt_deltas = [] |
| prev_eye = None |
| prev_tilt = None |
| frame_count = 0 |
| frames_lidos = 0 |
|
|
| |
| options = vision.FaceLandmarkerOptions( |
| base_options=BaseOptions( |
| model_asset_path=str(FACE_MODEL_PATH), |
| delegate=BaseOptions.Delegate.CPU, |
| ), |
| |
| output_face_blendshapes=True, |
| output_facial_transformation_matrixes=False, |
| running_mode=vision.RunningMode.VIDEO, |
| |
| num_faces=1 |
| ) |
|
|
| detector = vision.FaceLandmarker.create_from_options(options) |
|
|
| try: |
| while True: |
| ret, frame = cap.read() |
| if not ret: |
| break |
| |
| frames_lidos += 1 |
|
|
| if frame_count % FRAME_SKIP_FACE_SIGNALS != 0: |
| frame_count += 1 |
| continue |
|
|
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
| |
| mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb) |
|
|
| |
| fps = cap.get(cv2.CAP_PROP_FPS) or 30 |
| timestamp = int(frame_count * (1000 / fps)) |
|
|
| try: |
| log(f"Processando frame {frame_count}", logger=logger) |
| result = detector.detect(mp_image, timestamp) |
| |
| log(f"Frame {frame_count} processado com sucesso", logger=logger) |
| except Exception as e: |
| log(f"Erro ao processar frame {frame_count} (timestamp {timestamp}ms): {e}", level="error", logger=logger) |
| break |
|
|
| if result.face_landmarks: |
| landmarks = result.face_landmarks[0] |
|
|
| |
| |
| eye_ratio = abs(landmarks[159].y - landmarks[145].y) |
| eye_ratios.append(eye_ratio) |
|
|
| |
| tilt = abs(landmarks[33].y - landmarks[263].y) |
| head_tilts.append(tilt) |
|
|
| if prev_eye is not None: |
| eye_deltas.append(abs(eye_ratio - prev_eye)) |
|
|
| if prev_tilt is not None: |
| tilt_deltas.append(abs(tilt - prev_tilt)) |
|
|
| prev_eye = eye_ratio |
| prev_tilt = tilt |
|
|
| frame_count += 1 |
| finally: |
| cap.release() |
| detector.close() |
| del detector |
| gc.collect() |
| try: |
| cv2.destroyAllWindows() |
| except: |
| pass |
|
|
|
|
| |
| if frames_lidos == 0: |
| msg = f"Erro: nenhum frame foi lido do vídeo: {video_path}" |
| log(msg, level="error", logger=logger) |
| raise ValueError(msg) |
|
|
| |
| avg_eye = sum(eye_ratios)/len(eye_ratios) if eye_ratios else 0 |
| avg_tilt = sum(head_tilts)/len(head_tilts) if head_tilts else 0 |
|
|
| |
| movement_eye = sum(eye_deltas)/len(eye_deltas) if eye_deltas else 0 |
| movement_tilt = sum(tilt_deltas)/len(tilt_deltas) if tilt_deltas else 0 |
|
|
| |
| eye_std = np.std(eye_ratios) if eye_ratios else 0 |
| tilt_std = np.std(head_tilts) if head_tilts else 0 |
|
|
| |
| agitation_score = movement_eye + movement_tilt + eye_std + tilt_std |
|
|
| log(f"Eye ratio médio: {avg_eye}", logger=logger) |
| log(f"Inclinação cabeça: {avg_tilt}", logger=logger) |
|
|
| log(f"Movimento olhos: {movement_eye}", logger=logger) |
| log(f"Movimento cabeça: {movement_tilt}", logger=logger) |
|
|
| log(f"Desvio padrão olhos: {eye_std}", logger=logger) |
| log(f"Desvio padrão cabeça: {tilt_std}", logger=logger) |
|
|
| log(f"Score de agitação: {agitation_score}", logger=logger) |
|
|
| log("Extração de sinais faciais concluída", logger=logger) |
|
|
| return { |
| "eye_ratio": avg_eye, |
| "head_tilt": avg_tilt, |
| "movement_eye": movement_eye, |
| "movement_tilt": movement_tilt, |
| "eye_std": eye_std, |
| "tilt_std": tilt_std, |
| "agitation_score": agitation_score |
| } |
|
|
| def extract_face_signals_facemesh(video_path, logger=None): |
| mp_face_mesh = mp.solutions.face_mesh |
| |
| log(f"[FaceMesh] Iniciando extração de sinais faciais para {video_path}", logger=logger) |
|
|
| if not video_path or not os.path.isfile(video_path): |
| msg = f"Arquivo de vídeo não encontrado: {video_path}" |
| log(msg, level="error", logger=logger) |
| raise FileNotFoundError(msg) |
|
|
| cap = cv2.VideoCapture(video_path) |
|
|
| if not cap.isOpened(): |
| msg = f"Erro: não foi possível abrir o vídeo: {video_path}" |
| log(msg, level="error", logger=logger) |
| raise FileNotFoundError(msg) |
|
|
| has_frame, first_frame = cap.read() |
| if not has_frame or first_frame is None: |
| cap.release() |
| msg = f"Arquivo inválido ou sem frames: {video_path}" |
| log(msg, level="error", logger=logger) |
| raise ValueError(msg) |
|
|
| cap.set(cv2.CAP_PROP_POS_FRAMES, 0) |
|
|
| eye_ratios = [] |
| head_tilts = [] |
| eye_deltas = [] |
| tilt_deltas = [] |
|
|
| prev_eye = None |
| prev_tilt = None |
|
|
| frame_count = 0 |
| frames_lidos = 0 |
|
|
| |
| face_mesh = mp_face_mesh.FaceMesh( |
| static_image_mode=False, |
| max_num_faces=1, |
| refine_landmarks=False, |
| min_detection_confidence=0.5, |
| min_tracking_confidence=0.5 |
| ) |
|
|
| try: |
| while True: |
| ret, frame = cap.read() |
| if not ret: |
| break |
|
|
| frames_lidos += 1 |
|
|
| if frame_count % FRAME_SKIP_FACE_SIGNALS != 0: |
| frame_count += 1 |
| continue |
|
|
| |
| frame = cv2.resize(frame, (320, 240)) |
|
|
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
| try: |
| log(f"[FaceMesh] Processando frame {frame_count}", logger=logger) |
| results = face_mesh.process(rgb) |
| except Exception as e: |
| log(f"[FaceMesh] Erro no frame {frame_count}: {e}", level="error", logger=logger) |
| break |
|
|
| if results.multi_face_landmarks: |
| landmarks = results.multi_face_landmarks[0].landmark |
|
|
| |
| eye_ratio = abs(landmarks[159].y - landmarks[145].y) |
| eye_ratios.append(eye_ratio) |
|
|
| |
| tilt = abs(landmarks[33].y - landmarks[263].y) |
| head_tilts.append(tilt) |
|
|
| if prev_eye is not None: |
| eye_deltas.append(abs(eye_ratio - prev_eye)) |
|
|
| if prev_tilt is not None: |
| tilt_deltas.append(abs(tilt - prev_tilt)) |
|
|
| prev_eye = eye_ratio |
| prev_tilt = tilt |
|
|
| frame_count += 1 |
|
|
| finally: |
| cap.release() |
| face_mesh.close() |
| gc.collect() |
| try: |
| cv2.destroyAllWindows() |
| except: |
| pass |
|
|
|
|
| if frames_lidos == 0: |
| msg = f"Nenhum frame lido do vídeo: {video_path}" |
| log(msg, level="error", logger=logger) |
| raise ValueError(msg) |
|
|
| |
| avg_eye = sum(eye_ratios)/len(eye_ratios) if eye_ratios else 0 |
| avg_tilt = sum(head_tilts)/len(head_tilts) if head_tilts else 0 |
|
|
| movement_eye = sum(eye_deltas)/len(eye_deltas) if eye_deltas else 0 |
| movement_tilt = sum(tilt_deltas)/len(tilt_deltas) if tilt_deltas else 0 |
|
|
| eye_std = np.std(eye_ratios) if eye_ratios else 0 |
| tilt_std = np.std(head_tilts) if head_tilts else 0 |
|
|
| agitation_score = movement_eye + movement_tilt + eye_std + tilt_std |
|
|
| log(f"[FaceMesh] Eye ratio médio: {avg_eye}", logger=logger) |
| log(f"[FaceMesh] Inclinação cabeça: {avg_tilt}", logger=logger) |
|
|
| log(f"[FaceMesh] Movimento olhos: {movement_eye}", logger=logger) |
| log(f"[FaceMesh] Movimento cabeça: {movement_tilt}", logger=logger) |
|
|
| log(f"[FaceMesh] Desvio padrão olhos: {eye_std}", logger=logger) |
| log(f"[FaceMesh] Desvio padrão cabeça: {tilt_std}", logger=logger) |
|
|
| log(f"[FaceMesh] Score de agitação: {agitation_score}", logger=logger) |
|
|
| log("[FaceMesh] Extração concluída", logger=logger) |
|
|
| return { |
| "eye_ratio": avg_eye, |
| "head_tilt": avg_tilt, |
| "movement_eye": movement_eye, |
| "movement_tilt": movement_tilt, |
| "eye_std": eye_std, |
| "tilt_std": tilt_std, |
| "agitation_score": agitation_score |
| } |