Spaces:
Runtime error
Runtime error
| import queue | |
| import threading | |
| import cv2 | |
| class BufferlessVideoCapture: | |
| def __init__(self, name, width=None, height=None): | |
| self.cap = cv2.VideoCapture(name) | |
| if width is not None and height is not None: | |
| self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) | |
| self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) | |
| self.q = queue.Queue() | |
| t = threading.Thread(target=self._reader) | |
| t.daemon = True | |
| t.start() | |
| # read frames as soon as they are available, keeping only most recent one | |
| def _reader(self): | |
| while True: | |
| ret, frame = self.cap.read() | |
| if not ret: | |
| break | |
| if not self.q.empty(): | |
| try: | |
| self.q.get_nowait() # discard previous (unprocessed) frame | |
| except queue.Empty: | |
| pass | |
| self.q.put((ret, frame)) | |
| def read(self): | |
| return self.q.get() | |