Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import cv2 | |
| import time | |
| import threading | |
| import subprocess | |
| from collections import deque | |
| from config import TUNING | |
| class FrameReader(threading.Thread): | |
| def __init__(self, video_url: str): | |
| super().__init__(daemon=True) | |
| self.video_url = video_url | |
| self.buffer = deque(maxlen=TUNING["FRAME_READER_BUFFER_SIZE"]) | |
| self.fps = TUNING["FRAME_READER_FPS"] | |
| self.running = threading.Event() | |
| self.cap = None | |
| self._is_file = self._looks_like_file(video_url) | |
| def _looks_like_file(self, url: str) -> bool: | |
| if os.path.exists(url): | |
| return True | |
| return not re.match(r'^[a-zA-Z]+://', url or "") | |
| def _resolve_url(self, url: str) -> str: | |
| if "youtube.com" in url or "youtu.be" in url: | |
| try: | |
| print(f"[FrameReader] Resolving YouTube URL: {url}") | |
| result = subprocess.run( | |
| ["yt-dlp", "--get-url", url], | |
| capture_output=True, text=True, check=True, timeout=30 | |
| ) | |
| urls = result.stdout.strip().splitlines() | |
| if not urls: | |
| print("[FrameReader] No URLs returned by yt-dlp") | |
| return None | |
| stream_url = urls[0] | |
| print(f"[FrameReader] Resolved to stream: {stream_url}") | |
| return stream_url | |
| except subprocess.CalledProcessError as e: | |
| stderr = e.stderr.strip() | |
| print(f"[FrameReader] yt-dlp failed with error: {stderr}") | |
| if "unavailable" in stderr: | |
| print("[FrameReader] Video may be offline, private, or geo-restricted.") | |
| return None | |
| except Exception as e: | |
| print(f"[FrameReader] Unexpected error resolving YouTube URL: {e}") | |
| return None | |
| return url | |
| def run(self): | |
| """ Read each frame of a video and store in buffer """ | |
| resolved = self._resolve_url(self.video_url) | |
| self.cap = cv2.VideoCapture(resolved, cv2.CAP_FFMPEG) | |
| if not self.cap.isOpened(): | |
| print(f"[FrameReader] Cannot open: {resolved}") | |
| return | |
| try: | |
| self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) | |
| except Exception: | |
| pass | |
| self.running.set() | |
| # Count Time & FPS | |
| src_fps = self.cap.get(cv2.CAP_PROP_FPS) | |
| if not src_fps or src_fps <= 1e-3: | |
| src_fps = float(self.fps) if self.fps else 30.0 | |
| frame_period = 1.0 / float(src_fps) | |
| next_ts = time.monotonic() | |
| # Read Frame Loop | |
| while self.running.is_set(): | |
| ret, frame = self.cap.read() | |
| if not ret: | |
| if os.path.exists(self.video_url): | |
| print(f"[FrameReader] End of file reached. Looping {self.video_url}") | |
| self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0) | |
| continue | |
| time.sleep(0.5) | |
| continue | |
| if frame is not None: | |
| self.buffer.append(frame) | |
| # Time Sync | |
| next_ts += frame_period | |
| sleep_for = next_ts - time.monotonic() | |
| if sleep_for > 0: | |
| time.sleep(sleep_for) | |
| else: | |
| next_ts = time.monotonic() | |
| if self.cap: | |
| self.cap.release() | |
| def stop(self): | |
| self.running.clear() | |
| self.join() | |
| def read(self): | |
| return self.buffer[-1].copy() if self.buffer else None |