Spaces:
Sleeping
Sleeping
| """ | |
| MotionScope Pro - Core Movement Detection Engine | |
| Combines MediaPipe HandLandmarker (tasks API) with background subtraction. | |
| """ | |
| import os | |
| import urllib.request | |
| import cv2 | |
| import numpy as np | |
| import mediapipe as mp | |
| from enum import Enum | |
| from dataclasses import dataclass | |
| from typing import Tuple, Generator | |
| # MediaPipe tasks API (lazy-loaded via attribute access) | |
| _BaseOptions = mp.tasks.BaseOptions | |
| _HandLandmarker = mp.tasks.vision.HandLandmarker | |
| _HandLandmarkerOptions = mp.tasks.vision.HandLandmarkerOptions | |
| _RunningMode = mp.tasks.vision.RunningMode | |
| # Path to the hand landmarker model (shipped alongside this file) | |
| _MODEL_PATH = os.path.join(os.path.dirname(__file__), "hand_landmarker.task") | |
| def _ensure_model_exists(): | |
| """Download the model if it doesn't exist locally.""" | |
| if not os.path.exists(_MODEL_PATH): | |
| print(f"Downloading model to {_MODEL_PATH}...") | |
| url = "https://storage.googleapis.com/mediapipe-models/hand_landmarker/hand_landmarker/float16/latest/hand_landmarker.task" | |
| urllib.request.urlretrieve(url, _MODEL_PATH) | |
| class DetectionMode(Enum): | |
| """Available detection modes.""" | |
| HAND_TRACKING = "Hand Tracking" | |
| MOTION_DETECTION = "Motion Detection" | |
| COMBINED = "Combined" | |
| class DetectionConfig: | |
| """Tunable parameters for detection.""" | |
| # MediaPipe hand settings | |
| min_detection_confidence: float = 0.5 | |
| min_tracking_confidence: float = 0.5 | |
| max_num_hands: int = 2 | |
| # Motion detection settings | |
| motion_threshold: int = 180 | |
| min_contour_area: int = 1000 | |
| blur_kernel_size: Tuple[int, int] = (5, 5) | |
| morph_kernel_size: Tuple[int, int] = (3, 3) | |
| # Background subtractor settings | |
| bg_history: int = 500 | |
| bg_var_threshold: int = 16 | |
| bg_detect_shadows: bool = True | |
| class MovementDetector: | |
| """ | |
| Professional movement detector combining MediaPipe hands + MOG2 | |
| background subtraction. | |
| """ | |
| def __init__(self, config: DetectionConfig | None = None): | |
| self.config = config or DetectionConfig() | |
| self.hand_landmarker = self._build_hand_landmarker() | |
| self.back_sub = self._build_back_sub() | |
| self.frame_count: int = 0 | |
| # ------------------------------------------------------------------ | |
| # Builder helpers | |
| # ------------------------------------------------------------------ | |
| def _build_hand_landmarker(self): | |
| _ensure_model_exists() | |
| options = _HandLandmarkerOptions( | |
| base_options=_BaseOptions(model_asset_path=_MODEL_PATH), | |
| running_mode=_RunningMode.IMAGE, | |
| num_hands=self.config.max_num_hands, | |
| min_hand_detection_confidence=self.config.min_detection_confidence, | |
| min_tracking_confidence=self.config.min_tracking_confidence, | |
| ) | |
| return _HandLandmarker.create_from_options(options) | |
| def _build_back_sub(self): | |
| return cv2.createBackgroundSubtractorMOG2( | |
| history=self.config.bg_history, | |
| varThreshold=self.config.bg_var_threshold, | |
| detectShadows=self.config.bg_detect_shadows, | |
| ) | |
| def rebuild(self, config: DetectionConfig): | |
| """Rebuild internal models when the user changes settings.""" | |
| self.config = config | |
| self.hand_landmarker.close() | |
| self.hand_landmarker = self._build_hand_landmarker() | |
| self.back_sub = self._build_back_sub() | |
| self.frame_count = 0 | |
| # ------------------------------------------------------------------ | |
| # Hand detection (new tasks API) | |
| # ------------------------------------------------------------------ | |
| def detect_hands(self, frame: np.ndarray) -> np.ndarray: | |
| """ | |
| Detect hands and draw landmarks + labels on *frame* (BGR). | |
| Uses MediaPipe tasks API HandLandmarker. | |
| """ | |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb) | |
| result = self.hand_landmarker.detect(mp_image) | |
| h, w, _ = frame.shape | |
| if result.hand_landmarks: | |
| for idx, landmarks in enumerate(result.hand_landmarks): | |
| # Draw connections manually since draw_landmarks expects | |
| # NormalizedLandmarkList but we have a list of landmarks | |
| self._draw_hand_skeleton(frame, landmarks, w, h) | |
| # Label near wrist (landmark 0) | |
| wrist = landmarks[0] | |
| cx, cy = int(wrist.x * w), int(wrist.y * h) | |
| label = "Hand" | |
| if result.handedness and idx < len(result.handedness): | |
| label = result.handedness[idx][0].category_name | |
| cv2.putText( | |
| frame, label, (cx - 30, cy - 20), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, | |
| ) | |
| return frame | |
| def _draw_hand_skeleton(self, frame, landmarks, w, h): | |
| """Draw landmark points and connections on *frame*.""" | |
| # Define the 21 hand landmark connections (pairs of indices) | |
| connections = [ | |
| (0, 1), (1, 2), (2, 3), (3, 4), # Thumb | |
| (0, 5), (5, 6), (6, 7), (7, 8), # Index | |
| (0, 9), (9, 10), (10, 11), (11, 12), # Middle | |
| (0, 13), (13, 14), (14, 15), (15, 16), # Ring | |
| (0, 17), (17, 18), (18, 19), (19, 20), # Pinky | |
| (5, 9), (9, 13), (13, 17), # Palm | |
| ] | |
| # Convert normalized landmarks to pixel coordinates | |
| pts = [] | |
| for lm in landmarks: | |
| px, py = int(lm.x * w), int(lm.y * h) | |
| pts.append((px, py)) | |
| # Draw connections | |
| for start, end in connections: | |
| cv2.line(frame, pts[start], pts[end], (0, 255, 0), 2) | |
| # Draw landmark dots | |
| for px, py in pts: | |
| cv2.circle(frame, (px, py), 5, (255, 0, 128), -1) | |
| cv2.circle(frame, (px, py), 5, (255, 255, 255), 1) | |
| # ------------------------------------------------------------------ | |
| # Motion detection | |
| # ------------------------------------------------------------------ | |
| def detect_motion(self, frame: np.ndarray) -> Tuple[np.ndarray, np.ndarray, int]: | |
| """ | |
| Background-subtraction motion detection. | |
| Returns | |
| ------- | |
| processed : BGR frame with bounding boxes | |
| mask : cleaned foreground mask | |
| count : number of detected moving objects | |
| """ | |
| fg_mask = self.back_sub.apply(frame) | |
| _, mask_thresh = cv2.threshold( | |
| fg_mask, self.config.motion_threshold, 255, cv2.THRESH_BINARY, | |
| ) | |
| mask_blur = cv2.GaussianBlur(mask_thresh, self.config.blur_kernel_size, 0) | |
| kernel = cv2.getStructuringElement( | |
| cv2.MORPH_ELLIPSE, self.config.morph_kernel_size, | |
| ) | |
| mask_clean = cv2.morphologyEx(mask_blur, cv2.MORPH_OPEN, kernel) | |
| mask_clean = cv2.morphologyEx(mask_clean, cv2.MORPH_CLOSE, kernel) | |
| contours, _ = cv2.findContours( | |
| mask_clean, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE, | |
| ) | |
| valid = [] | |
| for cnt in contours: | |
| area = cv2.contourArea(cnt) | |
| if area > self.config.min_contour_area: | |
| valid.append(cnt) | |
| x, y, bw, bh = cv2.boundingRect(cnt) | |
| cv2.rectangle(frame, (x, y), (x + bw, y + bh), (0, 0, 255), 2) | |
| cv2.putText( | |
| frame, f"Area: {int(area)}", (x, y - 10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2, | |
| ) | |
| cv2.putText( | |
| frame, f"Moving objects: {len(valid)}", (10, 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, | |
| ) | |
| return frame, mask_clean, len(valid) | |
| # ------------------------------------------------------------------ | |
| # High-level frame dispatcher | |
| # ------------------------------------------------------------------ | |
| def process_frame(self, frame: np.ndarray, mode: DetectionMode) -> np.ndarray: | |
| """Process a single frame according to the selected *mode*.""" | |
| self.frame_count += 1 | |
| out = frame.copy() | |
| if mode == DetectionMode.HAND_TRACKING: | |
| return self.detect_hands(out) | |
| elif mode == DetectionMode.MOTION_DETECTION: | |
| processed, _, _ = self.detect_motion(out) | |
| return processed | |
| elif mode == DetectionMode.COMBINED: | |
| motion_frame, _, _ = self.detect_motion(out) | |
| return self.detect_hands(motion_frame) | |
| return out | |
| # ------------------------------------------------------------------ | |
| # Full-video processing generator | |
| # ------------------------------------------------------------------ | |
| def process_video( | |
| self, | |
| source: str, | |
| mode: DetectionMode = DetectionMode.MOTION_DETECTION, | |
| output_path: str = "output.mp4", | |
| ) -> Generator[Tuple[np.ndarray | None, str | None, float], None, None]: | |
| """ | |
| Iterate over every frame in *source*, yield processed RGB frames. | |
| Yields | |
| ------ | |
| (display_frame_rgb | None, output_path | None, progress) | |
| """ | |
| self.frame_count = 0 | |
| self.back_sub = self._build_back_sub() # fresh background model | |
| cap = cv2.VideoCapture(source) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Cannot open video: {source}") | |
| frame_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| frame_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) or 30 | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 1 | |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (frame_w, frame_h)) | |
| try: | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| processed = self.process_frame(frame, mode) | |
| out.write(processed) | |
| display = cv2.cvtColor(processed, cv2.COLOR_BGR2RGB) | |
| progress = min(self.frame_count / total_frames, 1.0) | |
| yield display, None, progress | |
| finally: | |
| cap.release() | |
| out.release() | |
| yield None, output_path, 1.0 | |
| # ------------------------------------------------------------------ | |
| # Cleanup | |
| # ------------------------------------------------------------------ | |
| def release(self): | |
| """Free resources.""" | |
| self.hand_landmarker.close() | |