animal-tracking-v2 / tracker.py
pvanand's picture
Upload tracker.py
99cde21 verified
"""
Object tracking module using YOLO
"""
import cv2
import time
from ultralytics import YOLO
from typing import Dict, Optional, Tuple
import numpy as np
from config import Config
# Import yt-dlp for YouTube stream if using YouTube stream
if Config.USE_YOUTUBE_STREAM:
try:
import yt_dlp
except ImportError:
print("Warning: yt-dlp not installed. Install with: pip install yt-dlp")
yt_dlp = None
class ObjectTracker:
"""Handles YOLO object detection and tracking"""
# COCO class IDs for animals only (excluding humans)
# 14: bird, 15: cat, 16: dog, 17: horse, 18: sheep, 19: cow,
# 20: elephant, 21: bear, 22: zebra, 23: giraffe
ANIMAL_CLASSES = [14, 15, 16, 17, 18, 19, 20, 21, 22, 23]
def __init__(self, model_path: str = None):
"""
Initialize the object tracker
Args:
model_path: Path to YOLO model file
"""
self.model_path = model_path or Config.YOLO_MODEL
self.model = None
self.fps = 0
self.frame_count = 0
self.start_time = time.time()
self.last_fps_update = time.time()
self._load_model()
def _load_model(self):
"""Load YOLO model"""
try:
print(f"Loading YOLO model: {self.model_path}")
self.model = YOLO(self.model_path)
print("βœ“ YOLO model loaded successfully")
except Exception as e:
print(f"βœ— Error loading YOLO model: {e}")
raise
def process_frame(self, frame: np.ndarray, track: bool = True) -> Tuple[np.ndarray, Dict]:
"""
Process a frame with YOLO tracking
Args:
frame: Input frame (BGR format)
track: Whether to use tracking (True) or just detection (False)
Returns:
Tuple of (annotated_frame, metadata)
"""
self.frame_count += 1
# Update FPS every second
current_time = time.time()
if current_time - self.last_fps_update >= 1.0:
self.fps = self.frame_count / (current_time - self.start_time)
self.last_fps_update = current_time
# Run YOLO inference (only detect animals, excluding humans)
if track:
results = self.model.track(
frame,
persist=True,
tracker=Config.TRACKER_CONFIG,
verbose=False,
conf=Config.CONFIDENCE_THRESHOLD,
classes=self.ANIMAL_CLASSES
)
else:
results = self.model(
frame,
verbose=False,
conf=Config.CONFIDENCE_THRESHOLD,
classes=self.ANIMAL_CLASSES
)
# Extract metadata
metadata = self._extract_metadata(results[0])
# Get annotated frame (without labels/class names)
annotated_frame = results[0].plot(labels=False)
return annotated_frame, metadata
def _extract_metadata(self, result) -> Dict:
"""
Extract detection metadata from YOLO result
Args:
result: YOLO result object
Returns:
Dictionary containing detection metadata
"""
detections = []
detected_classes = set()
if result.boxes is not None and len(result.boxes) > 0:
boxes = result.boxes
track_ids = boxes.id.cpu().numpy().tolist() if boxes.id is not None else [None] * len(boxes)
class_ids = boxes.cls.cpu().numpy().tolist()
confidences = boxes.conf.cpu().numpy().tolist()
bboxes = boxes.xyxy.cpu().numpy().tolist() # [x1, y1, x2, y2]
for i, class_id in enumerate(class_ids):
class_name = self.model.names[int(class_id)]
detected_classes.add(class_name)
detection = {
"track_id": int(track_ids[i]) if track_ids[i] is not None else None,
"class_id": int(class_id),
"class_name": class_name,
"confidence": round(confidences[i], 3),
"bbox": [round(coord, 2) for coord in bboxes[i]]
}
detections.append(detection)
metadata = {
"num_detections": len(detections),
"detections": detections,
"detected_classes": list(sorted(detected_classes)),
"fps": round(self.fps, 1),
"frame_count": self.frame_count
}
return metadata
def get_fps(self) -> float:
"""Get current FPS"""
return self.fps
def reset_stats(self):
"""Reset frame count and timing stats"""
self.frame_count = 0
self.start_time = time.time()
self.last_fps_update = time.time()
self.fps = 0
class CameraCapture:
"""Handles camera capture with configuration"""
def __init__(self, camera_index: int = None, width: int = None, height: int = None, youtube_url: str = None):
"""
Initialize camera capture
Args:
camera_index: Camera device index (used if not using YouTube)
width: Frame width
height: Frame height
youtube_url: YouTube video URL (overrides Config.YOUTUBE_URL if provided)
"""
self.camera_index = camera_index or Config.CAMERA_INDEX
self.width = width or Config.CAMERA_WIDTH
self.height = height or Config.CAMERA_HEIGHT
self.use_youtube = Config.USE_YOUTUBE_STREAM
self.youtube_url = youtube_url or Config.YOUTUBE_URL
self.cap = None
self._first_frame = None # Store first frame read during initialization
self._initialize_camera()
def _initialize_camera(self):
"""Initialize camera with settings"""
if self.use_youtube:
if yt_dlp is None:
raise RuntimeError("yt-dlp is not installed. Install with: pip install yt-dlp")
print(f"Initializing YouTube stream from: {self.youtube_url}")
try:
# Use yt-dlp to get the stream URL
ydl_opts = {
'format': 'best[height<=720]/best', # Prefer 720p or lower, fallback to best
'quiet': False,
'no_warnings': False,
'force_ipv4': True, # Force IPv4 to avoid DNS resolution issues
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
print(" Extracting stream URL...")
info = ydl.extract_info(self.youtube_url, download=False)
# Try to get the direct stream URL
if 'url' in info:
stream_url = info['url']
elif 'requested_formats' in info and len(info['requested_formats']) > 0:
stream_url = info['requested_formats'][0].get('url')
elif 'formats' in info and len(info['formats']) > 0:
# Find the best video format
video_formats = [f for f in info['formats'] if f.get('vcodec') != 'none']
if video_formats:
# Sort by height (prefer 720p or lower)
video_formats.sort(key=lambda x: x.get('height', 0), reverse=True)
# Filter to 720p or lower if available
preferred = [f for f in video_formats if f.get('height', 9999) <= 720]
if preferred:
stream_url = preferred[0].get('url')
else:
stream_url = video_formats[0].get('url')
else:
raise RuntimeError("No video formats found")
else:
raise RuntimeError("Could not extract stream URL from video info")
if not stream_url:
raise RuntimeError("Stream URL is empty")
print(f" Opening stream URL...")
# Open the stream URL with OpenCV
self.cap = cv2.VideoCapture(stream_url)
if not self.cap.isOpened():
raise RuntimeError(f"Failed to open stream URL")
# Try to read a frame to verify it works
ret, test_frame = self.cap.read()
if not ret or test_frame is None:
raise RuntimeError("Failed to read initial frame from stream")
# Store first frame so we don't lose it
self._first_frame = test_frame
actual_height, actual_width = test_frame.shape[:2]
print(f"βœ“ YouTube stream initialized: {actual_width}x{actual_height}")
except Exception as e:
error_msg = str(e)
print(f"βœ— Error details: {error_msg}")
raise RuntimeError(f"Failed to initialize YouTube stream from {self.youtube_url}: {error_msg}")
else:
self.cap = cv2.VideoCapture(self.camera_index)
if not self.cap.isOpened():
raise RuntimeError(f"Failed to open camera {self.camera_index}")
# Set resolution
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
# Verify resolution
actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"βœ“ Camera initialized: {actual_width}x{actual_height}")
def read(self) -> Tuple[bool, Optional[np.ndarray]]:
"""Read a frame from camera"""
if self.cap is None:
return False, None
# If we have a stored first frame (from YouTube initialization), return it first
if self._first_frame is not None:
frame = self._first_frame
self._first_frame = None
return True, frame
return self.cap.read()
def release(self):
"""Release camera resources"""
if self.cap is not None:
self.cap.release()
self.cap = None
def is_opened(self) -> bool:
"""Check if camera is opened"""
return self.cap is not None and self.cap.isOpened()
def change_youtube_url(self, new_url: str):
"""
Change the YouTube URL and reinitialize the stream
Args:
new_url: New YouTube video URL
Returns:
bool: True if successful, False otherwise
"""
if not self.use_youtube:
return False
try:
# Release old stream
if self.cap:
self.cap.release()
self.cap = None
# Update URL
self.youtube_url = new_url
self._first_frame = None
# Reinitialize with new URL
self._initialize_camera()
return True
except Exception as e:
print(f"βœ— Error changing YouTube URL: {e}")
return False
def __del__(self):
"""Cleanup on deletion"""
self.release()