""" video_stream.py - LOGOS Video Streaming via META/DELTA Heat Transmission META Frames: Full keyframes (high heat threshold crossed, scene changes) DELTA Frames: Frame-to-frame differences (temporal compression) Architecture: - Producer thread: Reads & encodes frames ahead of playback - Consumer thread: Displays at source FPS from buffer - First saturation: Initial buffering, then real-time streaming """ import cv2 import numpy as np import time import threading import queue from dataclasses import dataclass, field from typing import Optional, Callable, Tuple, List from concurrent.futures import ThreadPoolExecutor from .logos_core import ( calculate_heat_code, pack_atom, unpack_atom, ATOM_SIZE, PAYLOAD_SIZE, META_SIZE ) def tile_to_heat_code(row: int, col: int, rows: int, cols: int) -> int: """Convert tile position to heat code using quadtree path.""" path = [] r_start, r_end = 0, rows c_start, c_end = 0, cols for _ in range(16): if r_end - r_start <= 1 and c_end - c_start <= 1: break r_mid = (r_start + r_end) // 2 c_mid = (c_start + c_end) // 2 in_bottom = row >= r_mid if r_mid < r_end else False in_right = col >= c_mid if c_mid < c_end else False quadrant = (2 if in_bottom else 0) + (1 if in_right else 0) path.append(quadrant) if in_bottom: r_start = r_mid else: r_end = r_mid if in_right: c_start = c_mid else: c_end = c_mid return calculate_heat_code(path) # Heat thresholds - applied PER TILE (wave) # No IDLE - every wave always transmits (PERSIST/DELTA/FULL) TILE_PERSIST = 0.08 # < 8% change = persist (signal only, no pixel data) TILE_DELTA = 0.35 # 8-35% = delta transmission # > 35% = full tile transmission # Initial saturation buffer only SATURATION_BUFFER = 10 # Just enough for startup smoothing # Parallel wave processing WAVE_WORKERS = 32 # Parallel wave encoders @dataclass class FrameStats: """Stats for a single frame transmission""" frame_idx: int timestamp_ms: float frame_type: str # "META", "DELTA", "SKIP" delta_heat: float atoms_sent: int encode_ms: float @dataclass class VideoStats: """Aggregate video streaming stats""" total_frames: int = 0 meta_frames: int = 0 delta_frames: int = 0 skipped_frames: int = 0 total_atoms: int = 0 elapsed_ms: float = 0 avg_fps: float = 0 compression_ratio: float = 0 source_fps: float = 0 width: int = 0 height: int = 0 class VideoStreamBridge: """ LOGOS Video Streaming via META/DELTA Heat Protocol META = Keyframes (full frame, scene changes) DELTA = Temporal difference frames """ def __init__(self, num_workers: int = 16, viewport_size: Tuple[int, int] = (1280, 720), keyframe_interval: int = 60, # Force keyframe every N frames (1 sec at 60fps) persist_threshold: float = TILE_PERSIST, delta_threshold: float = TILE_DELTA): self.num_workers = num_workers self.viewport_size = viewport_size self.keyframe_interval = keyframe_interval self.persist_threshold = persist_threshold self.delta_threshold = delta_threshold self._stop_requested = False self._is_streaming = False # Frame buffers self.prev_frame: Optional[np.ndarray] = None self.canvas: Optional[np.ndarray] = None self.width = 0 self.height = 0 # Stats self.frame_stats: List[FrameStats] = [] def calculate_delta_heat(self, current: np.ndarray, previous: np.ndarray) -> Tuple[float, np.ndarray]: """ Calculate delta heat between frames using block-based comparison. More tolerant of minor noise/compression artifacts. Returns: (heat_ratio, delta_mask) """ if previous is None: return 1.0, np.ones(current.shape[:2], dtype=np.uint8) * 255 # Downsample for faster comparison (quarter resolution) h, w = current.shape[:2] small_h, small_w = h // 4, w // 4 curr_small = cv2.resize(current, (small_w, small_h), interpolation=cv2.INTER_AREA) prev_small = cv2.resize(previous, (small_w, small_h), interpolation=cv2.INTER_AREA) # Compute absolute difference on downsampled diff = cv2.absdiff(curr_small, prev_small) # Convert to grayscale if len(diff.shape) == 3: gray_diff = np.max(diff, axis=2) # Max channel diff (faster than cvtColor) else: gray_diff = diff # Higher threshold to ignore compression noise (20 instead of 10) _, delta_mask_small = cv2.threshold(gray_diff, 20, 255, cv2.THRESH_BINARY) # Calculate heat ratio changed_pixels = np.count_nonzero(delta_mask_small) total_pixels = delta_mask_small.size heat_ratio = changed_pixels / total_pixels # Upscale mask for tile-level decisions delta_mask = cv2.resize(delta_mask_small, (w, h), interpolation=cv2.INTER_NEAREST) return heat_ratio, delta_mask def classify_tile(self, tile_heat: float) -> str: """ Classify individual tile (wave) based on its local heat. Every wave ALWAYS transmits - fidelity is paramount. Returns: "PERSIST" (unchanged signal), "DELTA" (partial), or "FULL" (complete) """ if tile_heat < TILE_PERSIST: return "PERSIST" # Wave unchanged - send persist marker elif tile_heat < TILE_DELTA: return "DELTA" # Wave changed - send delta data else: return "FULL" # Wave changed significantly - send full data def encode_frame_waves(self, frame: np.ndarray, prev_frame: np.ndarray, timestamp_ms: float, is_keyframe: bool = False) -> Tuple[List[bytes], dict]: """ Encode frame using per-wave (tile) heat classification. Each wave independently decides: IDLE, DELTA, or FULL. Returns: (atoms, wave_stats) """ h, w = frame.shape[:2] tile_size = 256 # Larger tiles = fewer waves = faster processing rows = (h + tile_size - 1) // tile_size cols = (w + tile_size - 1) // tile_size wave_stats = {"persist": 0, "delta": 0, "full": 0} def encode_wave(args): row, col = args y0, x0 = row * tile_size, col * tile_size y1, x1 = min(y0 + tile_size, h), min(x0 + tile_size, w) tile = frame[y0:y1, x0:x1] # Calculate per-wave heat (higher noise threshold for video compression) if prev_frame is not None and not is_keyframe: prev_tile = prev_frame[y0:y1, x0:x1] diff = cv2.absdiff(tile, prev_tile) if len(diff.shape) == 3: gray_diff = np.max(diff, axis=2) else: gray_diff = diff changed = np.count_nonzero(gray_diff > 25) # Higher threshold for codec noise tile_heat = changed / max(gray_diff.size, 1) else: tile_heat = 1.0 # First frame or keyframe = full # Classify this wave - every wave transmits something wave_type = "FULL" if is_keyframe else self.classify_tile(tile_heat) import struct heat_code = tile_to_heat_code(row, col, rows, cols) if wave_type == "PERSIST": # Persist: minimal atom - just position marker, no pixel data # Type 2 = persist meta_header = struct.pack('>fHHB', timestamp_ms/1000, row, col, 2) atom = pack_atom(heat_code, meta_header, domain_key="video_delta", gap_id=0) return atom, "persist" # DELTA or FULL: encode tile (downsample for speed) if tile.shape[0] > 32 and tile.shape[1] > 32: tile_small = cv2.resize(tile, (tile.shape[1]//2, tile.shape[0]//2), interpolation=cv2.INTER_AREA) else: tile_small = tile tile_bytes = tile_small.tobytes() # Pack: timestamp, row, col, tile dimensions, wave type (0=full, 1=delta) type_byte = 0 if wave_type == "FULL" else 1 meta_header = struct.pack('>fHHBBB', timestamp_ms/1000, row, col, tile_small.shape[0], tile_small.shape[1], type_byte) METADATA_SIZE = 11 PIXEL_DATA_SIZE = PAYLOAD_SIZE - META_SIZE - METADATA_SIZE chunk = tile_bytes[:PIXEL_DATA_SIZE] payload = meta_header + chunk domain = "video_meta" if wave_type == "FULL" else "video_delta" atom = pack_atom(heat_code, payload, domain_key=domain, gap_id=0) return atom, "full" if wave_type == "FULL" else "delta" # Parallel wave processing tile_coords = [(r, c) for r in range(rows) for c in range(cols)] with ThreadPoolExecutor(max_workers=WAVE_WORKERS) as executor: results = list(executor.map(encode_wave, tile_coords)) # Collect results and stats - every wave produces an atom atoms = [] for atom, wave_type in results: atoms.append(atom) wave_stats[wave_type] += 1 return atoms, wave_stats def decode_frame_atoms(self, atoms: List[bytes], base_frame: np.ndarray) -> np.ndarray: """ Decode wave atoms back to frame. - PERSIST (type=2): No change, keep existing tile - DELTA (type=1): Update tile from delta data - FULL (type=0): Replace tile entirely """ import struct result = base_frame.copy() if base_frame is not None else np.zeros( (self.height, self.width, 3), dtype=np.uint8 ) tile_size = 256 # Match encode tile size for atom in atoms: heat_code, payload, domain_key, gap_id = unpack_atom(atom) if len(payload) < 7: # Minimum: ts(4) + row(2) + col(2) + type(1) = 9, but persist is shorter continue # Check for persist atom (shorter format) if len(payload) < 11: # Persist format: timestamp(4), row(2), col(2), type(1) = 9 bytes if len(payload) >= 9: ts, row, col, wave_type = struct.unpack('>fHHB', payload[:9]) if wave_type == 2: # PERSIST continue # Keep existing tile unchanged continue # Full/Delta format: timestamp(4), row(2), col(2), th(1), tw(1), type(1) ts, row, col, th, tw, wave_type = struct.unpack('>fHHBBB', payload[:11]) if wave_type == 2: # PERSIST - shouldn't happen here but just in case continue pixel_data = payload[11:] y0 = row * tile_size x0 = col * tile_size y1 = min(y0 + tile_size, self.height) x1 = min(x0 + tile_size, self.width) full_h = y1 - y0 full_w = x1 - x0 needed = th * tw * 3 if len(pixel_data) >= needed: try: small_tile = np.frombuffer(pixel_data[:needed], dtype=np.uint8) small_tile = small_tile.reshape(th, tw, 3) # Upscale to full tile size if th != full_h or tw != full_w: full_tile = cv2.resize(small_tile, (full_w, full_h), interpolation=cv2.INTER_NEAREST) else: full_tile = small_tile result[y0:y1, x0:x1] = full_tile except (ValueError, cv2.error): pass return result def stream(self, source_path: str, show_window: bool = True) -> VideoStats: """ Stream video using per-wave heat protocol. Architecture: - Initial saturation buffer (small) - Then real-time streaming at source FPS - Each frame: waves independently decide IDLE/DELTA/FULL - Idle waves don't transmit (maximum efficiency) """ self._stop_requested = False self._is_streaming = True cap = cv2.VideoCapture(source_path) if not cap.isOpened(): raise ValueError(f"Cannot open video: {source_path}") self.width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) source_fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) frame_time = 1.0 / source_fps # Calculate wave grid tile_size = 128 wave_rows = (self.height + tile_size - 1) // tile_size wave_cols = (self.width + tile_size - 1) // tile_size total_waves = wave_rows * wave_cols print(f"[VIDEO] Source: {self.width}×{self.height} @ {source_fps:.1f}fps") print(f"[VIDEO] Waves: {wave_rows}×{wave_cols} = {total_waves} per frame") print(f"[VIDEO] Workers: {WAVE_WORKERS} | Saturation: {SATURATION_BUFFER} frames") print("-" * 50) # Initialize self.canvas = np.zeros((self.height, self.width, 3), dtype=np.uint8) prev_frame = None # Saturation buffer (small, just for startup) frame_buffer = queue.Queue(maxsize=SATURATION_BUFFER) encoding_done = threading.Event() # Stats stats = VideoStats(source_fps=source_fps, width=self.width, height=self.height) total_persist = [0] total_delta = [0] total_full = [0] total_atom_bytes = [0] # ========== PRODUCER: Encode at source rate ========== def producer(): nonlocal prev_frame frame_idx = 0 local_prev = None while not self._stop_requested: ret, frame = cap.read() if not ret: break timestamp_ms = (frame_idx / source_fps) * 1000 is_keyframe = (frame_idx == 0 or frame_idx % self.keyframe_interval == 0) # Per-wave encoding atoms, wave_stats = self.encode_frame_waves( frame, local_prev, timestamp_ms, is_keyframe ) total_persist[0] += wave_stats["persist"] total_delta[0] += wave_stats["delta"] total_full[0] += wave_stats["full"] total_atom_bytes[0] += len(atoms) * ATOM_SIZE stats.total_atoms += len(atoms) # Queue for display - include raw frame for keyframes try: frame_buffer.put({ 'idx': frame_idx, 'frame': frame if is_keyframe else None, # Raw frame for keyframes 'atoms': atoms, 'wave_stats': wave_stats, 'is_keyframe': is_keyframe, 'timestamp': timestamp_ms }, timeout=0.5) except queue.Full: pass local_prev = frame frame_idx += 1 stats.total_frames = frame_idx encoding_done.set() cap.release() producer_thread = threading.Thread(target=producer, daemon=True) producer_thread.start() # Window if show_window: cv2.namedWindow("LOGOS Video Stream", cv2.WINDOW_NORMAL) cv2.resizeWindow("LOGOS Video Stream", *self.viewport_size) # Initial saturation print("[VIDEO] Saturating...") while frame_buffer.qsize() < SATURATION_BUFFER and not encoding_done.is_set(): time.sleep(0.005) print(f"[VIDEO] Saturated. Streaming at {source_fps:.0f}fps...") start_time = time.perf_counter() display_idx = 0 last_log = start_time try: while not self._stop_requested: frame_start = time.perf_counter() try: data = frame_buffer.get(timeout=0.1) except queue.Empty: if encoding_done.is_set() and frame_buffer.empty(): break continue # Update canvas if data.get('is_keyframe') and data.get('frame') is not None: # Keyframe: use raw frame directly for perfect quality self.canvas = data['frame'] # No copy needed - producer moves on elif data['atoms']: # Filter out PERSIST atoms (they don't change canvas) # PERSIST atoms are small (< 11 bytes payload) active_atoms = [a for a in data['atoms'] if len(a) > 20] # Full atoms are larger if active_atoms: self.canvas = self.decode_frame_atoms(active_atoms, self.canvas) # Display with precise timing via waitKey if show_window: cv2.imshow("LOGOS Video Stream", self.canvas) # Calculate exact wait time in ms for this frame elapsed_ms = (time.perf_counter() - frame_start) * 1000 wait_ms = max(1, int(frame_time * 1000 - elapsed_ms)) key = cv2.waitKey(wait_ms) & 0xFF if key in (ord('q'), 27): break else: # No window - just maintain timing elapsed = time.perf_counter() - frame_start if elapsed < frame_time: time.sleep(frame_time - elapsed) display_idx += 1 # Log every 5 seconds (not every frame, not even every second) now = time.perf_counter() if now - last_log >= 5.0: actual_fps = display_idx / (now - start_time) print(f"[VIDEO] {display_idx}/{stats.total_frames} | {actual_fps:.1f}fps | " f"P:{total_persist[0]} Δ:{total_delta[0]} F:{total_full[0]}") last_log = now finally: self._stop_requested = True self._is_streaming = False producer_thread.join(timeout=1.0) if show_window: cv2.destroyAllWindows() # Final stats elapsed = time.perf_counter() - start_time stats.elapsed_ms = elapsed * 1000 stats.avg_fps = display_idx / elapsed if elapsed > 0 else 0 stats.meta_frames = total_full[0] stats.delta_frames = total_delta[0] stats.skipped_frames = total_persist[0] # Persist (not skipped, just unchanged) source_bytes = self.width * self.height * 3 * stats.total_frames stats.compression_ratio = source_bytes / max(total_atom_bytes[0], 1) total_waves = total_persist[0] + total_delta[0] + total_full[0] print("=" * 50) print(f"[VIDEO] Complete: {stats.total_frames} frames @ {stats.avg_fps:.1f}fps") print(f"[VIDEO] Waves: {total_waves} total") print(f"[VIDEO] PERSIST: {total_persist[0]} ({100*total_persist[0]/max(total_waves,1):.1f}%)") print(f"[VIDEO] DELTA: {total_delta[0]} ({100*total_delta[0]/max(total_waves,1):.1f}%)") print(f"[VIDEO] FULL: {total_full[0]} ({100*total_full[0]/max(total_waves,1):.1f}%)") print(f"[VIDEO] Compression: {stats.compression_ratio:.1f}x") return stats def stop(self): """Stop streaming""" self._stop_requested = True def is_streaming(self) -> bool: return self._is_streaming # ----------------- Audio Channel (Stub for future) ----------------- class AudioChannel: """ Separate audio channel for LOGOS video streaming. Audio is synchronized via timestamps, not interleaved with video. """ def __init__(self, sample_rate: int = 44100, chunk_size: int = 1024): self.sample_rate = sample_rate self.chunk_size = chunk_size self._audio_buffer = [] def extract_audio(self, video_path: str) -> Optional[np.ndarray]: """Extract audio track from video (requires ffmpeg)""" # TODO: Implement audio extraction # Use subprocess to call ffmpeg and extract raw PCM return None def encode_audio_chunk(self, audio_data: np.ndarray, timestamp_ms: float) -> bytes: """Encode audio chunk as atom""" # TODO: Implement audio encoding return b'' def decode_audio_chunk(self, atom: bytes) -> Tuple[np.ndarray, float]: """Decode audio atom""" # TODO: Implement audio decoding return np.array([]), 0.0 # ----------------- Test ----------------- if __name__ == "__main__": import sys if len(sys.argv) < 2: print("Usage: python video_stream.py ") sys.exit(1) video_path = sys.argv[1] bridge = VideoStreamBridge( num_workers=16, keyframe_interval=30 ) stats = bridge.stream(video_path, show_window=True) print(f"\nFinal: {stats.avg_fps:.1f} fps, {stats.compression_ratio:.1f}x compression")