Spaces:
Sleeping
Sleeping
| """ | |
| Object tracking module using YOLO | |
| """ | |
| import cv2 | |
| import time | |
| from ultralytics import YOLO | |
| from typing import Dict, Optional, Tuple | |
| import numpy as np | |
| from config import Config | |
| # Import yt-dlp for YouTube stream if using YouTube stream | |
| if Config.USE_YOUTUBE_STREAM: | |
| try: | |
| import yt_dlp | |
| except ImportError: | |
| print("Warning: yt-dlp not installed. Install with: pip install yt-dlp") | |
| yt_dlp = None | |
| class ObjectTracker: | |
| """Handles YOLO object detection and tracking""" | |
| # COCO class IDs for animals only (excluding humans) | |
| # 14: bird, 15: cat, 16: dog, 17: horse, 18: sheep, 19: cow, | |
| # 20: elephant, 21: bear, 22: zebra, 23: giraffe | |
| ANIMAL_CLASSES = [14, 15, 16, 17, 18, 19, 20, 21, 22, 23] | |
| def __init__(self, model_path: str = None): | |
| """ | |
| Initialize the object tracker | |
| Args: | |
| model_path: Path to YOLO model file | |
| """ | |
| self.model_path = model_path or Config.YOLO_MODEL | |
| self.model = None | |
| self.fps = 0 | |
| self.frame_count = 0 | |
| self.start_time = time.time() | |
| self.last_fps_update = time.time() | |
| self._load_model() | |
| def _load_model(self): | |
| """Load YOLO model""" | |
| try: | |
| print(f"Loading YOLO model: {self.model_path}") | |
| self.model = YOLO(self.model_path) | |
| print("β YOLO model loaded successfully") | |
| except Exception as e: | |
| print(f"β Error loading YOLO model: {e}") | |
| raise | |
| def process_frame(self, frame: np.ndarray, track: bool = True) -> Tuple[np.ndarray, Dict]: | |
| """ | |
| Process a frame with YOLO tracking | |
| Args: | |
| frame: Input frame (BGR format) | |
| track: Whether to use tracking (True) or just detection (False) | |
| Returns: | |
| Tuple of (annotated_frame, metadata) | |
| """ | |
| self.frame_count += 1 | |
| # Update FPS every second | |
| current_time = time.time() | |
| if current_time - self.last_fps_update >= 1.0: | |
| self.fps = self.frame_count / (current_time - self.start_time) | |
| self.last_fps_update = current_time | |
| # Run YOLO inference (only detect animals, excluding humans) | |
| if track: | |
| results = self.model.track( | |
| frame, | |
| persist=True, | |
| tracker=Config.TRACKER_CONFIG, | |
| verbose=False, | |
| conf=Config.CONFIDENCE_THRESHOLD, | |
| classes=self.ANIMAL_CLASSES | |
| ) | |
| else: | |
| results = self.model( | |
| frame, | |
| verbose=False, | |
| conf=Config.CONFIDENCE_THRESHOLD, | |
| classes=self.ANIMAL_CLASSES | |
| ) | |
| # Extract metadata | |
| metadata = self._extract_metadata(results[0]) | |
| # Get annotated frame (without labels/class names) | |
| annotated_frame = results[0].plot(labels=False) | |
| return annotated_frame, metadata | |
| def _extract_metadata(self, result) -> Dict: | |
| """ | |
| Extract detection metadata from YOLO result | |
| Args: | |
| result: YOLO result object | |
| Returns: | |
| Dictionary containing detection metadata | |
| """ | |
| detections = [] | |
| detected_classes = set() | |
| if result.boxes is not None and len(result.boxes) > 0: | |
| boxes = result.boxes | |
| track_ids = boxes.id.cpu().numpy().tolist() if boxes.id is not None else [None] * len(boxes) | |
| class_ids = boxes.cls.cpu().numpy().tolist() | |
| confidences = boxes.conf.cpu().numpy().tolist() | |
| bboxes = boxes.xyxy.cpu().numpy().tolist() # [x1, y1, x2, y2] | |
| for i, class_id in enumerate(class_ids): | |
| class_name = self.model.names[int(class_id)] | |
| detected_classes.add(class_name) | |
| detection = { | |
| "track_id": int(track_ids[i]) if track_ids[i] is not None else None, | |
| "class_id": int(class_id), | |
| "class_name": class_name, | |
| "confidence": round(confidences[i], 3), | |
| "bbox": [round(coord, 2) for coord in bboxes[i]] | |
| } | |
| detections.append(detection) | |
| metadata = { | |
| "num_detections": len(detections), | |
| "detections": detections, | |
| "detected_classes": list(sorted(detected_classes)), | |
| "fps": round(self.fps, 1), | |
| "frame_count": self.frame_count | |
| } | |
| return metadata | |
| def get_fps(self) -> float: | |
| """Get current FPS""" | |
| return self.fps | |
| def reset_stats(self): | |
| """Reset frame count and timing stats""" | |
| self.frame_count = 0 | |
| self.start_time = time.time() | |
| self.last_fps_update = time.time() | |
| self.fps = 0 | |
| class CameraCapture: | |
| """Handles camera capture with configuration""" | |
| def __init__(self, camera_index: int = None, width: int = None, height: int = None, youtube_url: str = None): | |
| """ | |
| Initialize camera capture | |
| Args: | |
| camera_index: Camera device index (used if not using YouTube) | |
| width: Frame width | |
| height: Frame height | |
| youtube_url: YouTube video URL (overrides Config.YOUTUBE_URL if provided) | |
| """ | |
| self.camera_index = camera_index or Config.CAMERA_INDEX | |
| self.width = width or Config.CAMERA_WIDTH | |
| self.height = height or Config.CAMERA_HEIGHT | |
| self.use_youtube = Config.USE_YOUTUBE_STREAM | |
| self.youtube_url = youtube_url or Config.YOUTUBE_URL | |
| self.cap = None | |
| self._first_frame = None # Store first frame read during initialization | |
| self._initialize_camera() | |
| def _initialize_camera(self): | |
| """Initialize camera with settings""" | |
| if self.use_youtube: | |
| if yt_dlp is None: | |
| raise RuntimeError("yt-dlp is not installed. Install with: pip install yt-dlp") | |
| print(f"Initializing YouTube stream from: {self.youtube_url}") | |
| try: | |
| # Use yt-dlp to get the stream URL | |
| ydl_opts = { | |
| 'format': 'best[height<=720]/best', # Prefer 720p or lower, fallback to best | |
| 'quiet': False, | |
| 'no_warnings': False, | |
| 'force_ipv4': True, # Force IPv4 to avoid DNS resolution issues | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| print(" Extracting stream URL...") | |
| info = ydl.extract_info(self.youtube_url, download=False) | |
| # Try to get the direct stream URL | |
| if 'url' in info: | |
| stream_url = info['url'] | |
| elif 'requested_formats' in info and len(info['requested_formats']) > 0: | |
| stream_url = info['requested_formats'][0].get('url') | |
| elif 'formats' in info and len(info['formats']) > 0: | |
| # Find the best video format | |
| video_formats = [f for f in info['formats'] if f.get('vcodec') != 'none'] | |
| if video_formats: | |
| # Sort by height (prefer 720p or lower) | |
| video_formats.sort(key=lambda x: x.get('height', 0), reverse=True) | |
| # Filter to 720p or lower if available | |
| preferred = [f for f in video_formats if f.get('height', 9999) <= 720] | |
| if preferred: | |
| stream_url = preferred[0].get('url') | |
| else: | |
| stream_url = video_formats[0].get('url') | |
| else: | |
| raise RuntimeError("No video formats found") | |
| else: | |
| raise RuntimeError("Could not extract stream URL from video info") | |
| if not stream_url: | |
| raise RuntimeError("Stream URL is empty") | |
| print(f" Opening stream URL...") | |
| # Open the stream URL with OpenCV | |
| self.cap = cv2.VideoCapture(stream_url) | |
| if not self.cap.isOpened(): | |
| raise RuntimeError(f"Failed to open stream URL") | |
| # Try to read a frame to verify it works | |
| ret, test_frame = self.cap.read() | |
| if not ret or test_frame is None: | |
| raise RuntimeError("Failed to read initial frame from stream") | |
| # Store first frame so we don't lose it | |
| self._first_frame = test_frame | |
| actual_height, actual_width = test_frame.shape[:2] | |
| print(f"β YouTube stream initialized: {actual_width}x{actual_height}") | |
| except Exception as e: | |
| error_msg = str(e) | |
| print(f"β Error details: {error_msg}") | |
| raise RuntimeError(f"Failed to initialize YouTube stream from {self.youtube_url}: {error_msg}") | |
| else: | |
| self.cap = cv2.VideoCapture(self.camera_index) | |
| if not self.cap.isOpened(): | |
| raise RuntimeError(f"Failed to open camera {self.camera_index}") | |
| # Set resolution | |
| self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) | |
| self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) | |
| # Verify resolution | |
| actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| print(f"β Camera initialized: {actual_width}x{actual_height}") | |
| def read(self) -> Tuple[bool, Optional[np.ndarray]]: | |
| """Read a frame from camera""" | |
| if self.cap is None: | |
| return False, None | |
| # If we have a stored first frame (from YouTube initialization), return it first | |
| if self._first_frame is not None: | |
| frame = self._first_frame | |
| self._first_frame = None | |
| return True, frame | |
| return self.cap.read() | |
| def release(self): | |
| """Release camera resources""" | |
| if self.cap is not None: | |
| self.cap.release() | |
| self.cap = None | |
| def is_opened(self) -> bool: | |
| """Check if camera is opened""" | |
| return self.cap is not None and self.cap.isOpened() | |
| def change_youtube_url(self, new_url: str): | |
| """ | |
| Change the YouTube URL and reinitialize the stream | |
| Args: | |
| new_url: New YouTube video URL | |
| Returns: | |
| bool: True if successful, False otherwise | |
| """ | |
| if not self.use_youtube: | |
| return False | |
| try: | |
| # Release old stream | |
| if self.cap: | |
| self.cap.release() | |
| self.cap = None | |
| # Update URL | |
| self.youtube_url = new_url | |
| self._first_frame = None | |
| # Reinitialize with new URL | |
| self._initialize_camera() | |
| return True | |
| except Exception as e: | |
| print(f"β Error changing YouTube URL: {e}") | |
| return False | |
| def __del__(self): | |
| """Cleanup on deletion""" | |
| self.release() | |