Spaces:
Runtime error
Runtime error
| """ | |
| video_stream.py - LOGOS Video Streaming via META/DELTA Heat Transmission | |
| META Frames: Full keyframes (high heat threshold crossed, scene changes) | |
| DELTA Frames: Frame-to-frame differences (temporal compression) | |
| Architecture: | |
| - Producer thread: Reads & encodes frames ahead of playback | |
| - Consumer thread: Displays at source FPS from buffer | |
| - First saturation: Initial buffering, then real-time streaming | |
| """ | |
| import cv2 | |
| import numpy as np | |
| import time | |
| import threading | |
| import queue | |
| from dataclasses import dataclass, field | |
| from typing import Optional, Callable, Tuple, List | |
| from concurrent.futures import ThreadPoolExecutor | |
| from .logos_core import ( | |
| calculate_heat_code, pack_atom, unpack_atom, | |
| ATOM_SIZE, PAYLOAD_SIZE, META_SIZE | |
| ) | |
| def tile_to_heat_code(row: int, col: int, rows: int, cols: int) -> int: | |
| """Convert tile position to heat code using quadtree path.""" | |
| path = [] | |
| r_start, r_end = 0, rows | |
| c_start, c_end = 0, cols | |
| for _ in range(16): | |
| if r_end - r_start <= 1 and c_end - c_start <= 1: | |
| break | |
| r_mid = (r_start + r_end) // 2 | |
| c_mid = (c_start + c_end) // 2 | |
| in_bottom = row >= r_mid if r_mid < r_end else False | |
| in_right = col >= c_mid if c_mid < c_end else False | |
| quadrant = (2 if in_bottom else 0) + (1 if in_right else 0) | |
| path.append(quadrant) | |
| if in_bottom: | |
| r_start = r_mid | |
| else: | |
| r_end = r_mid | |
| if in_right: | |
| c_start = c_mid | |
| else: | |
| c_end = c_mid | |
| return calculate_heat_code(path) | |
| # Heat thresholds - applied PER TILE (wave) | |
| # No IDLE - every wave always transmits (PERSIST/DELTA/FULL) | |
| TILE_PERSIST = 0.08 # < 8% change = persist (signal only, no pixel data) | |
| TILE_DELTA = 0.35 # 8-35% = delta transmission | |
| # > 35% = full tile transmission | |
| # Initial saturation buffer only | |
| SATURATION_BUFFER = 10 # Just enough for startup smoothing | |
| # Parallel wave processing | |
| WAVE_WORKERS = 32 # Parallel wave encoders | |
| class FrameStats: | |
| """Stats for a single frame transmission""" | |
| frame_idx: int | |
| timestamp_ms: float | |
| frame_type: str # "META", "DELTA", "SKIP" | |
| delta_heat: float | |
| atoms_sent: int | |
| encode_ms: float | |
| class VideoStats: | |
| """Aggregate video streaming stats""" | |
| total_frames: int = 0 | |
| meta_frames: int = 0 | |
| delta_frames: int = 0 | |
| skipped_frames: int = 0 | |
| total_atoms: int = 0 | |
| elapsed_ms: float = 0 | |
| avg_fps: float = 0 | |
| compression_ratio: float = 0 | |
| source_fps: float = 0 | |
| width: int = 0 | |
| height: int = 0 | |
| class VideoStreamBridge: | |
| """ | |
| LOGOS Video Streaming via META/DELTA Heat Protocol | |
| META = Keyframes (full frame, scene changes) | |
| DELTA = Temporal difference frames | |
| """ | |
| def __init__(self, | |
| num_workers: int = 16, | |
| viewport_size: Tuple[int, int] = (1280, 720), | |
| keyframe_interval: int = 60, # Force keyframe every N frames (1 sec at 60fps) | |
| persist_threshold: float = TILE_PERSIST, | |
| delta_threshold: float = TILE_DELTA): | |
| self.num_workers = num_workers | |
| self.viewport_size = viewport_size | |
| self.keyframe_interval = keyframe_interval | |
| self.persist_threshold = persist_threshold | |
| self.delta_threshold = delta_threshold | |
| self._stop_requested = False | |
| self._is_streaming = False | |
| # Frame buffers | |
| self.prev_frame: Optional[np.ndarray] = None | |
| self.canvas: Optional[np.ndarray] = None | |
| self.width = 0 | |
| self.height = 0 | |
| # Stats | |
| self.frame_stats: List[FrameStats] = [] | |
| def calculate_delta_heat(self, current: np.ndarray, previous: np.ndarray) -> Tuple[float, np.ndarray]: | |
| """ | |
| Calculate delta heat between frames using block-based comparison. | |
| More tolerant of minor noise/compression artifacts. | |
| Returns: (heat_ratio, delta_mask) | |
| """ | |
| if previous is None: | |
| return 1.0, np.ones(current.shape[:2], dtype=np.uint8) * 255 | |
| # Downsample for faster comparison (quarter resolution) | |
| h, w = current.shape[:2] | |
| small_h, small_w = h // 4, w // 4 | |
| curr_small = cv2.resize(current, (small_w, small_h), interpolation=cv2.INTER_AREA) | |
| prev_small = cv2.resize(previous, (small_w, small_h), interpolation=cv2.INTER_AREA) | |
| # Compute absolute difference on downsampled | |
| diff = cv2.absdiff(curr_small, prev_small) | |
| # Convert to grayscale | |
| if len(diff.shape) == 3: | |
| gray_diff = np.max(diff, axis=2) # Max channel diff (faster than cvtColor) | |
| else: | |
| gray_diff = diff | |
| # Higher threshold to ignore compression noise (20 instead of 10) | |
| _, delta_mask_small = cv2.threshold(gray_diff, 20, 255, cv2.THRESH_BINARY) | |
| # Calculate heat ratio | |
| changed_pixels = np.count_nonzero(delta_mask_small) | |
| total_pixels = delta_mask_small.size | |
| heat_ratio = changed_pixels / total_pixels | |
| # Upscale mask for tile-level decisions | |
| delta_mask = cv2.resize(delta_mask_small, (w, h), interpolation=cv2.INTER_NEAREST) | |
| return heat_ratio, delta_mask | |
| def classify_tile(self, tile_heat: float) -> str: | |
| """ | |
| Classify individual tile (wave) based on its local heat. | |
| Every wave ALWAYS transmits - fidelity is paramount. | |
| Returns: "PERSIST" (unchanged signal), "DELTA" (partial), or "FULL" (complete) | |
| """ | |
| if tile_heat < TILE_PERSIST: | |
| return "PERSIST" # Wave unchanged - send persist marker | |
| elif tile_heat < TILE_DELTA: | |
| return "DELTA" # Wave changed - send delta data | |
| else: | |
| return "FULL" # Wave changed significantly - send full data | |
| def encode_frame_waves(self, frame: np.ndarray, prev_frame: np.ndarray, | |
| timestamp_ms: float, is_keyframe: bool = False) -> Tuple[List[bytes], dict]: | |
| """ | |
| Encode frame using per-wave (tile) heat classification. | |
| Each wave independently decides: IDLE, DELTA, or FULL. | |
| Returns: (atoms, wave_stats) | |
| """ | |
| h, w = frame.shape[:2] | |
| tile_size = 256 # Larger tiles = fewer waves = faster processing | |
| rows = (h + tile_size - 1) // tile_size | |
| cols = (w + tile_size - 1) // tile_size | |
| wave_stats = {"persist": 0, "delta": 0, "full": 0} | |
| def encode_wave(args): | |
| row, col = args | |
| y0, x0 = row * tile_size, col * tile_size | |
| y1, x1 = min(y0 + tile_size, h), min(x0 + tile_size, w) | |
| tile = frame[y0:y1, x0:x1] | |
| # Calculate per-wave heat (higher noise threshold for video compression) | |
| if prev_frame is not None and not is_keyframe: | |
| prev_tile = prev_frame[y0:y1, x0:x1] | |
| diff = cv2.absdiff(tile, prev_tile) | |
| if len(diff.shape) == 3: | |
| gray_diff = np.max(diff, axis=2) | |
| else: | |
| gray_diff = diff | |
| changed = np.count_nonzero(gray_diff > 25) # Higher threshold for codec noise | |
| tile_heat = changed / max(gray_diff.size, 1) | |
| else: | |
| tile_heat = 1.0 # First frame or keyframe = full | |
| # Classify this wave - every wave transmits something | |
| wave_type = "FULL" if is_keyframe else self.classify_tile(tile_heat) | |
| import struct | |
| heat_code = tile_to_heat_code(row, col, rows, cols) | |
| if wave_type == "PERSIST": | |
| # Persist: minimal atom - just position marker, no pixel data | |
| # Type 2 = persist | |
| meta_header = struct.pack('>fHHB', timestamp_ms/1000, row, col, 2) | |
| atom = pack_atom(heat_code, meta_header, domain_key="video_delta", gap_id=0) | |
| return atom, "persist" | |
| # DELTA or FULL: encode tile (downsample for speed) | |
| if tile.shape[0] > 32 and tile.shape[1] > 32: | |
| tile_small = cv2.resize(tile, (tile.shape[1]//2, tile.shape[0]//2), | |
| interpolation=cv2.INTER_AREA) | |
| else: | |
| tile_small = tile | |
| tile_bytes = tile_small.tobytes() | |
| # Pack: timestamp, row, col, tile dimensions, wave type (0=full, 1=delta) | |
| type_byte = 0 if wave_type == "FULL" else 1 | |
| meta_header = struct.pack('>fHHBBB', timestamp_ms/1000, row, col, | |
| tile_small.shape[0], tile_small.shape[1], type_byte) | |
| METADATA_SIZE = 11 | |
| PIXEL_DATA_SIZE = PAYLOAD_SIZE - META_SIZE - METADATA_SIZE | |
| chunk = tile_bytes[:PIXEL_DATA_SIZE] | |
| payload = meta_header + chunk | |
| domain = "video_meta" if wave_type == "FULL" else "video_delta" | |
| atom = pack_atom(heat_code, payload, domain_key=domain, gap_id=0) | |
| return atom, "full" if wave_type == "FULL" else "delta" | |
| # Parallel wave processing | |
| tile_coords = [(r, c) for r in range(rows) for c in range(cols)] | |
| with ThreadPoolExecutor(max_workers=WAVE_WORKERS) as executor: | |
| results = list(executor.map(encode_wave, tile_coords)) | |
| # Collect results and stats - every wave produces an atom | |
| atoms = [] | |
| for atom, wave_type in results: | |
| atoms.append(atom) | |
| wave_stats[wave_type] += 1 | |
| return atoms, wave_stats | |
| def decode_frame_atoms(self, atoms: List[bytes], base_frame: np.ndarray) -> np.ndarray: | |
| """ | |
| Decode wave atoms back to frame. | |
| - PERSIST (type=2): No change, keep existing tile | |
| - DELTA (type=1): Update tile from delta data | |
| - FULL (type=0): Replace tile entirely | |
| """ | |
| import struct | |
| result = base_frame.copy() if base_frame is not None else np.zeros( | |
| (self.height, self.width, 3), dtype=np.uint8 | |
| ) | |
| tile_size = 256 # Match encode tile size | |
| for atom in atoms: | |
| heat_code, payload, domain_key, gap_id = unpack_atom(atom) | |
| if len(payload) < 7: # Minimum: ts(4) + row(2) + col(2) + type(1) = 9, but persist is shorter | |
| continue | |
| # Check for persist atom (shorter format) | |
| if len(payload) < 11: | |
| # Persist format: timestamp(4), row(2), col(2), type(1) = 9 bytes | |
| if len(payload) >= 9: | |
| ts, row, col, wave_type = struct.unpack('>fHHB', payload[:9]) | |
| if wave_type == 2: # PERSIST | |
| continue # Keep existing tile unchanged | |
| continue | |
| # Full/Delta format: timestamp(4), row(2), col(2), th(1), tw(1), type(1) | |
| ts, row, col, th, tw, wave_type = struct.unpack('>fHHBBB', payload[:11]) | |
| if wave_type == 2: # PERSIST - shouldn't happen here but just in case | |
| continue | |
| pixel_data = payload[11:] | |
| y0 = row * tile_size | |
| x0 = col * tile_size | |
| y1 = min(y0 + tile_size, self.height) | |
| x1 = min(x0 + tile_size, self.width) | |
| full_h = y1 - y0 | |
| full_w = x1 - x0 | |
| needed = th * tw * 3 | |
| if len(pixel_data) >= needed: | |
| try: | |
| small_tile = np.frombuffer(pixel_data[:needed], dtype=np.uint8) | |
| small_tile = small_tile.reshape(th, tw, 3) | |
| # Upscale to full tile size | |
| if th != full_h or tw != full_w: | |
| full_tile = cv2.resize(small_tile, (full_w, full_h), | |
| interpolation=cv2.INTER_NEAREST) | |
| else: | |
| full_tile = small_tile | |
| result[y0:y1, x0:x1] = full_tile | |
| except (ValueError, cv2.error): | |
| pass | |
| return result | |
| def stream(self, source_path: str, show_window: bool = True) -> VideoStats: | |
| """ | |
| Stream video using per-wave heat protocol. | |
| Architecture: | |
| - Initial saturation buffer (small) | |
| - Then real-time streaming at source FPS | |
| - Each frame: waves independently decide IDLE/DELTA/FULL | |
| - Idle waves don't transmit (maximum efficiency) | |
| """ | |
| self._stop_requested = False | |
| self._is_streaming = True | |
| cap = cv2.VideoCapture(source_path) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Cannot open video: {source_path}") | |
| self.width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| self.height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| source_fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| frame_time = 1.0 / source_fps | |
| # Calculate wave grid | |
| tile_size = 128 | |
| wave_rows = (self.height + tile_size - 1) // tile_size | |
| wave_cols = (self.width + tile_size - 1) // tile_size | |
| total_waves = wave_rows * wave_cols | |
| print(f"[VIDEO] Source: {self.width}×{self.height} @ {source_fps:.1f}fps") | |
| print(f"[VIDEO] Waves: {wave_rows}×{wave_cols} = {total_waves} per frame") | |
| print(f"[VIDEO] Workers: {WAVE_WORKERS} | Saturation: {SATURATION_BUFFER} frames") | |
| print("-" * 50) | |
| # Initialize | |
| self.canvas = np.zeros((self.height, self.width, 3), dtype=np.uint8) | |
| prev_frame = None | |
| # Saturation buffer (small, just for startup) | |
| frame_buffer = queue.Queue(maxsize=SATURATION_BUFFER) | |
| encoding_done = threading.Event() | |
| # Stats | |
| stats = VideoStats(source_fps=source_fps, width=self.width, height=self.height) | |
| total_persist = [0] | |
| total_delta = [0] | |
| total_full = [0] | |
| total_atom_bytes = [0] | |
| # ========== PRODUCER: Encode at source rate ========== | |
| def producer(): | |
| nonlocal prev_frame | |
| frame_idx = 0 | |
| local_prev = None | |
| while not self._stop_requested: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| timestamp_ms = (frame_idx / source_fps) * 1000 | |
| is_keyframe = (frame_idx == 0 or frame_idx % self.keyframe_interval == 0) | |
| # Per-wave encoding | |
| atoms, wave_stats = self.encode_frame_waves( | |
| frame, local_prev, timestamp_ms, is_keyframe | |
| ) | |
| total_persist[0] += wave_stats["persist"] | |
| total_delta[0] += wave_stats["delta"] | |
| total_full[0] += wave_stats["full"] | |
| total_atom_bytes[0] += len(atoms) * ATOM_SIZE | |
| stats.total_atoms += len(atoms) | |
| # Queue for display - include raw frame for keyframes | |
| try: | |
| frame_buffer.put({ | |
| 'idx': frame_idx, | |
| 'frame': frame if is_keyframe else None, # Raw frame for keyframes | |
| 'atoms': atoms, | |
| 'wave_stats': wave_stats, | |
| 'is_keyframe': is_keyframe, | |
| 'timestamp': timestamp_ms | |
| }, timeout=0.5) | |
| except queue.Full: | |
| pass | |
| local_prev = frame | |
| frame_idx += 1 | |
| stats.total_frames = frame_idx | |
| encoding_done.set() | |
| cap.release() | |
| producer_thread = threading.Thread(target=producer, daemon=True) | |
| producer_thread.start() | |
| # Window | |
| if show_window: | |
| cv2.namedWindow("LOGOS Video Stream", cv2.WINDOW_NORMAL) | |
| cv2.resizeWindow("LOGOS Video Stream", *self.viewport_size) | |
| # Initial saturation | |
| print("[VIDEO] Saturating...") | |
| while frame_buffer.qsize() < SATURATION_BUFFER and not encoding_done.is_set(): | |
| time.sleep(0.005) | |
| print(f"[VIDEO] Saturated. Streaming at {source_fps:.0f}fps...") | |
| start_time = time.perf_counter() | |
| display_idx = 0 | |
| last_log = start_time | |
| try: | |
| while not self._stop_requested: | |
| frame_start = time.perf_counter() | |
| try: | |
| data = frame_buffer.get(timeout=0.1) | |
| except queue.Empty: | |
| if encoding_done.is_set() and frame_buffer.empty(): | |
| break | |
| continue | |
| # Update canvas | |
| if data.get('is_keyframe') and data.get('frame') is not None: | |
| # Keyframe: use raw frame directly for perfect quality | |
| self.canvas = data['frame'] # No copy needed - producer moves on | |
| elif data['atoms']: | |
| # Filter out PERSIST atoms (they don't change canvas) | |
| # PERSIST atoms are small (< 11 bytes payload) | |
| active_atoms = [a for a in data['atoms'] if len(a) > 20] # Full atoms are larger | |
| if active_atoms: | |
| self.canvas = self.decode_frame_atoms(active_atoms, self.canvas) | |
| # Display with precise timing via waitKey | |
| if show_window: | |
| cv2.imshow("LOGOS Video Stream", self.canvas) | |
| # Calculate exact wait time in ms for this frame | |
| elapsed_ms = (time.perf_counter() - frame_start) * 1000 | |
| wait_ms = max(1, int(frame_time * 1000 - elapsed_ms)) | |
| key = cv2.waitKey(wait_ms) & 0xFF | |
| if key in (ord('q'), 27): | |
| break | |
| else: | |
| # No window - just maintain timing | |
| elapsed = time.perf_counter() - frame_start | |
| if elapsed < frame_time: | |
| time.sleep(frame_time - elapsed) | |
| display_idx += 1 | |
| # Log every 5 seconds (not every frame, not even every second) | |
| now = time.perf_counter() | |
| if now - last_log >= 5.0: | |
| actual_fps = display_idx / (now - start_time) | |
| print(f"[VIDEO] {display_idx}/{stats.total_frames} | {actual_fps:.1f}fps | " | |
| f"P:{total_persist[0]} Δ:{total_delta[0]} F:{total_full[0]}") | |
| last_log = now | |
| finally: | |
| self._stop_requested = True | |
| self._is_streaming = False | |
| producer_thread.join(timeout=1.0) | |
| if show_window: | |
| cv2.destroyAllWindows() | |
| # Final stats | |
| elapsed = time.perf_counter() - start_time | |
| stats.elapsed_ms = elapsed * 1000 | |
| stats.avg_fps = display_idx / elapsed if elapsed > 0 else 0 | |
| stats.meta_frames = total_full[0] | |
| stats.delta_frames = total_delta[0] | |
| stats.skipped_frames = total_persist[0] # Persist (not skipped, just unchanged) | |
| source_bytes = self.width * self.height * 3 * stats.total_frames | |
| stats.compression_ratio = source_bytes / max(total_atom_bytes[0], 1) | |
| total_waves = total_persist[0] + total_delta[0] + total_full[0] | |
| print("=" * 50) | |
| print(f"[VIDEO] Complete: {stats.total_frames} frames @ {stats.avg_fps:.1f}fps") | |
| print(f"[VIDEO] Waves: {total_waves} total") | |
| print(f"[VIDEO] PERSIST: {total_persist[0]} ({100*total_persist[0]/max(total_waves,1):.1f}%)") | |
| print(f"[VIDEO] DELTA: {total_delta[0]} ({100*total_delta[0]/max(total_waves,1):.1f}%)") | |
| print(f"[VIDEO] FULL: {total_full[0]} ({100*total_full[0]/max(total_waves,1):.1f}%)") | |
| print(f"[VIDEO] Compression: {stats.compression_ratio:.1f}x") | |
| return stats | |
| def stop(self): | |
| """Stop streaming""" | |
| self._stop_requested = True | |
| def is_streaming(self) -> bool: | |
| return self._is_streaming | |
| # ----------------- Audio Channel (Stub for future) ----------------- | |
| class AudioChannel: | |
| """ | |
| Separate audio channel for LOGOS video streaming. | |
| Audio is synchronized via timestamps, not interleaved with video. | |
| """ | |
| def __init__(self, sample_rate: int = 44100, chunk_size: int = 1024): | |
| self.sample_rate = sample_rate | |
| self.chunk_size = chunk_size | |
| self._audio_buffer = [] | |
| def extract_audio(self, video_path: str) -> Optional[np.ndarray]: | |
| """Extract audio track from video (requires ffmpeg)""" | |
| # TODO: Implement audio extraction | |
| # Use subprocess to call ffmpeg and extract raw PCM | |
| return None | |
| def encode_audio_chunk(self, audio_data: np.ndarray, timestamp_ms: float) -> bytes: | |
| """Encode audio chunk as atom""" | |
| # TODO: Implement audio encoding | |
| return b'' | |
| def decode_audio_chunk(self, atom: bytes) -> Tuple[np.ndarray, float]: | |
| """Decode audio atom""" | |
| # TODO: Implement audio decoding | |
| return np.array([]), 0.0 | |
| # ----------------- Test ----------------- | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) < 2: | |
| print("Usage: python video_stream.py <video_path>") | |
| sys.exit(1) | |
| video_path = sys.argv[1] | |
| bridge = VideoStreamBridge( | |
| num_workers=16, | |
| keyframe_interval=30 | |
| ) | |
| stats = bridge.stream(video_path, show_window=True) | |
| print(f"\nFinal: {stats.avg_fps:.1f} fps, {stats.compression_ratio:.1f}x compression") | |