Spaces:
Runtime error
Runtime error
File size: 7,265 Bytes
4295500 6d5d850 295376e 6d5d850 4295500 6d5d850 295376e 4295500 6d5d850 4295500 295376e 4295500 6d5d850 4295500 6d5d850 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | import os
import logging
from pathlib import Path
from typing import Iterable, List, Optional
import cv2
import numpy as np
import tensorflow as tf
import os
_mp_import_error = None
mp_solutions = None
try:
import mediapipe as mp # keep this for version/file debug
try:
# NEW: works even when mp.solutions is not exposed
from mediapipe.python import solutions as mp_solutions # type: ignore
except Exception:
# fallback for older layouts
from mediapipe import solutions as mp_solutions # type: ignore
except Exception as exc:
_mp_import_error = exc
mp_solutions = None
if os.getenv("DEBUG_MEDIAPIPE", "0") == "1":
try:
import mediapipe as mp
print("mediapipe version:", getattr(mp, "__version__", "unknown"))
print("mediapipe file:", getattr(mp, "__file__", "unknown"))
print("has solutions attr:", hasattr(mp, "solutions"))
# also verify the actual module we will use:
print("mp_solutions module:", getattr(mp_solutions, "__name__", None))
except Exception as dbg_exc:
print("mediapipe debug import failed:", dbg_exc)
# ------------------------------------------------------------------
# Local imports
# ------------------------------------------------------------------
from . import config
logger = logging.getLogger(__name__)
class VideoPreprocessor:
"""
Handles frame extraction and normalization from either a video file
or an iterable of pre-captured frames.
"""
def __init__(
self,
target_size: int = config.TARGET_SIZE,
max_frames: Optional[int] = config.MAX_FRAMES,
detection_confidence: float = config.DETECTION_CONFIDENCE,
tracking_confidence: float = config.TRACKING_CONFIDENCE,
):
self.target_size = target_size
self.max_frames = max_frames
self.detection_confidence = detection_confidence
self.tracking_confidence = tracking_confidence
# Indices for lip landmarks
self.UPPER_LIP_INDICES = [61, 185, 40, 39, 37, 0, 267, 269, 270, 409, 291]
self.LOWER_LIP_INDICES = [146, 91, 181, 84, 17, 314, 405, 321, 375, 291]
self.LIP_INDICES = self.UPPER_LIP_INDICES + self.LOWER_LIP_INDICES
def _require_face_mesh_module(self):
if mp_solutions is None:
raise RuntimeError(
"Mediapipe is not installed correctly. "
"Please install with `pip install mediapipe` (>=0.10). "
f"Original import error: {_mp_import_error}"
)
return mp_solutions.face_mesh
def preprocess_video(self, video_path: str) -> Optional[tf.Tensor]:
"""
Preprocess frames from a video file path.
Returns a normalized tensor of shape (num_frames, target_size, target_size, 1)
or None if no usable frames are found.
"""
path = Path(video_path)
if not path.exists():
logger.error("Video path does not exist: %s", video_path)
return None
cap = cv2.VideoCapture(str(path))
if not cap.isOpened():
logger.error("Failed to open video: %s", video_path)
return None
frames: List[tf.Tensor] = []
try:
face_mesh_module = self._require_face_mesh_module()
with face_mesh_module.FaceMesh(
static_image_mode=False,
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=self.detection_confidence,
min_tracking_confidence=self.tracking_confidence,
) as face_mesh:
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
processed = self._process_frame(frame, face_mesh)
if processed is not None:
frames.append(processed)
if self.max_frames and len(frames) >= self.max_frames:
logger.info("Reached max_frames=%s; stopping early.", self.max_frames)
break
finally:
cap.release()
return self._finalize_frames(frames)
def preprocess_frames(self, frames: Iterable[np.ndarray]) -> Optional[tf.Tensor]:
"""
Preprocess frames that have already been captured (e.g., from a webcam).
"""
processed_frames: List[tf.Tensor] = []
face_mesh_module = self._require_face_mesh_module()
with face_mesh_module.FaceMesh(
static_image_mode=False,
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=self.detection_confidence,
min_tracking_confidence=self.tracking_confidence,
) as face_mesh:
for frame in frames:
processed = self._process_frame(frame, face_mesh)
if processed is not None:
processed_frames.append(processed)
if self.max_frames and len(processed_frames) >= self.max_frames:
logger.info("Reached max_frames=%s; stopping early.", self.max_frames)
break
return self._finalize_frames(processed_frames)
def _process_frame(self, frame: np.ndarray, face_mesh) -> Optional[tf.Tensor]:
"""
Run landmark detection on a single frame and return a grayscale lip crop.
"""
try:
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = face_mesh.process(rgb_frame)
if not results.multi_face_landmarks:
logger.debug("No face landmarks detected in frame.")
return None
face_landmarks = results.multi_face_landmarks[0]
lip_landmarks = [face_landmarks.landmark[i] for i in self.LIP_INDICES]
h, w, _ = frame.shape
x_coords = [int(landmark.x * w) for landmark in lip_landmarks]
y_coords = [int(landmark.y * h) for landmark in lip_landmarks]
x_min, x_max = max(0, min(x_coords)), min(w, max(x_coords))
y_min, y_max = max(0, min(y_coords)), min(h, max(y_coords))
if x_max <= x_min or y_max <= y_min:
logger.debug("Invalid lip bounding box; skipping frame.")
return None
lip_frame = frame[y_min:y_max, x_min:x_max]
lip_frame_resized = cv2.resize(lip_frame, (self.target_size, self.target_size))
lip_frame_gray = tf.image.rgb_to_grayscale(lip_frame_resized)
return lip_frame_gray
except Exception as exc:
logger.warning("Error processing frame: %s", exc)
return None
def _finalize_frames(self, frames: List[tf.Tensor]) -> Optional[tf.Tensor]:
if not frames:
logger.error("No frames extracted during preprocessing.")
return None
stacked = tf.stack(frames)
stacked = tf.cast(stacked, tf.float32)
mean = tf.math.reduce_mean(stacked)
std = tf.math.reduce_std(stacked)
if tf.math.equal(std, 0.0):
std = tf.constant(1.0, dtype=tf.float32)
normalized = (stacked - mean) / std
return normalized
|