blind-nav / core /camera.py
Ramkumarnn's picture
Blind navigation MVP
00e634a
"""Live camera capture with threaded frame reading for consistent FPS.
Supports:
- USB webcam: CameraStream(0)
- RTSP/IP cam: CameraStream("rtsp://user:pass@192.168.1.100:554/stream")
- Video file (treated as stream): CameraStream("/path/to/video.mp4")
"""
import cv2
import threading
import time
class CameraStream:
"""Threaded camera reader — always holds the latest frame, never blocks."""
def __init__(self, source=0, target_fps=15):
self.source = source
self.target_fps = target_fps
self._cap = cv2.VideoCapture(source)
if not self._cap.isOpened():
raise RuntimeError(f"Cannot open camera: {source}")
# Read one frame to get dimensions
ret, frame = self._cap.read()
if not ret:
raise RuntimeError(f"Cannot read from camera: {source}")
self.frame = frame
self.w = int(self._cap.get(3))
self.h = int(self._cap.get(4))
self.native_fps = self._cap.get(5) or 30.0
self._lock = threading.Lock()
self._stop = threading.Event()
self._thread = threading.Thread(target=self._reader, daemon=True)
self._thread.start()
print(f"[Camera] Opened {source} ({self.w}x{self.h} @ {self.native_fps:.0f}fps)", flush=True)
def _reader(self):
"""Continuously grab frames in background."""
while not self._stop.is_set():
ret, frame = self._cap.read()
if not ret:
# End of stream or disconnect — try reconnect for RTSP
if isinstance(self.source, str) and self.source.startswith("rtsp"):
print("[Camera] Lost connection, reconnecting...", flush=True)
time.sleep(2)
self._cap.release()
self._cap = cv2.VideoCapture(self.source)
continue
break
with self._lock:
self.frame = frame
def read(self):
"""Get the latest frame. Never blocks."""
with self._lock:
return self.frame.copy() if self.frame is not None else None
@property
def is_open(self):
return not self._stop.is_set() and self._cap.isOpened()
def release(self):
self._stop.set()
self._thread.join(timeout=3)
self._cap.release()