"""Live camera capture with threaded frame reading for consistent FPS. Supports: - USB webcam: CameraStream(0) - RTSP/IP cam: CameraStream("rtsp://user:pass@192.168.1.100:554/stream") - Video file (treated as stream): CameraStream("/path/to/video.mp4") """ import cv2 import threading import time class CameraStream: """Threaded camera reader — always holds the latest frame, never blocks.""" def __init__(self, source=0, target_fps=15): self.source = source self.target_fps = target_fps self._cap = cv2.VideoCapture(source) if not self._cap.isOpened(): raise RuntimeError(f"Cannot open camera: {source}") # Read one frame to get dimensions ret, frame = self._cap.read() if not ret: raise RuntimeError(f"Cannot read from camera: {source}") self.frame = frame self.w = int(self._cap.get(3)) self.h = int(self._cap.get(4)) self.native_fps = self._cap.get(5) or 30.0 self._lock = threading.Lock() self._stop = threading.Event() self._thread = threading.Thread(target=self._reader, daemon=True) self._thread.start() print(f"[Camera] Opened {source} ({self.w}x{self.h} @ {self.native_fps:.0f}fps)", flush=True) def _reader(self): """Continuously grab frames in background.""" while not self._stop.is_set(): ret, frame = self._cap.read() if not ret: # End of stream or disconnect — try reconnect for RTSP if isinstance(self.source, str) and self.source.startswith("rtsp"): print("[Camera] Lost connection, reconnecting...", flush=True) time.sleep(2) self._cap.release() self._cap = cv2.VideoCapture(self.source) continue break with self._lock: self.frame = frame def read(self): """Get the latest frame. Never blocks.""" with self._lock: return self.frame.copy() if self.frame is not None else None @property def is_open(self): return not self._stop.is_set() and self._cap.isOpened() def release(self): self._stop.set() self._thread.join(timeout=3) self._cap.release()