ChickSense / utils /video /frame_reader.py
IceKhoffi's picture
Update utils/video/frame_reader.py
5adab85 verified
import os
import re
import cv2
import time
import threading
import subprocess
from collections import deque
from config import TUNING
class FrameReader(threading.Thread):
def __init__(self, video_url: str):
super().__init__(daemon=True)
self.video_url = video_url
self.buffer = deque(maxlen=TUNING["FRAME_READER_BUFFER_SIZE"])
self.fps = TUNING["FRAME_READER_FPS"]
self.running = threading.Event()
self.cap = None
self._is_file = self._looks_like_file(video_url)
def _looks_like_file(self, url: str) -> bool:
if os.path.exists(url):
return True
return not re.match(r'^[a-zA-Z]+://', url or "")
def _resolve_url(self, url: str) -> str:
if "youtube.com" in url or "youtu.be" in url:
try:
print(f"[FrameReader] Resolving YouTube URL: {url}")
result = subprocess.run(
["yt-dlp", "--get-url", url],
capture_output=True, text=True, check=True, timeout=30
)
urls = result.stdout.strip().splitlines()
if not urls:
print("[FrameReader] No URLs returned by yt-dlp")
return None
stream_url = urls[0]
print(f"[FrameReader] Resolved to stream: {stream_url}")
return stream_url
except subprocess.CalledProcessError as e:
stderr = e.stderr.strip()
print(f"[FrameReader] yt-dlp failed with error: {stderr}")
if "unavailable" in stderr:
print("[FrameReader] Video may be offline, private, or geo-restricted.")
return None
except Exception as e:
print(f"[FrameReader] Unexpected error resolving YouTube URL: {e}")
return None
return url
def run(self):
""" Read each frame of a video and store in buffer """
resolved = self._resolve_url(self.video_url)
self.cap = cv2.VideoCapture(resolved, cv2.CAP_FFMPEG)
if not self.cap.isOpened():
print(f"[FrameReader] Cannot open: {resolved}")
return
try:
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
except Exception:
pass
self.running.set()
# Count Time & FPS
src_fps = self.cap.get(cv2.CAP_PROP_FPS)
if not src_fps or src_fps <= 1e-3:
src_fps = float(self.fps) if self.fps else 30.0
frame_period = 1.0 / float(src_fps)
next_ts = time.monotonic()
# Read Frame Loop
while self.running.is_set():
ret, frame = self.cap.read()
if not ret:
if os.path.exists(self.video_url):
print(f"[FrameReader] End of file reached. Looping {self.video_url}")
self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
continue
time.sleep(0.5)
continue
if frame is not None:
self.buffer.append(frame)
# Time Sync
next_ts += frame_period
sleep_for = next_ts - time.monotonic()
if sleep_for > 0:
time.sleep(sleep_for)
else:
next_ts = time.monotonic()
if self.cap:
self.cap.release()
def stop(self):
self.running.clear()
self.join()
def read(self):
return self.buffer[-1].copy() if self.buffer else None