""" bake_stream.py - The LOGOS Encoder (Phase 4 - Fractal Round-Trip) Spatial tiling with proper heat code addressing for lossless reconstruction. Each tile's position is encoded into its heat code; payload contains raw pixel data. """ import math import time import cv2 import argparse import struct from typing import List, Tuple from .logos_core import ( calculate_heat_code, pack_atom, PAYLOAD_SIZE, ATOM_SIZE, META_SIZE, ) # ========================================== # FRACTAL TILE ADDRESSING # ========================================== def tile_to_quadtree_path(tile_row: int, tile_col: int, grid_rows: int, grid_cols: int) -> List[int]: """ Convert a tile's (row, col) position to a quadtree navigation path. This encodes the spatial position into 2-bit quadrant choices. Args: tile_row: Row index of tile (0-based) tile_col: Column index of tile (0-based) grid_rows: Total rows in grid grid_cols: Total columns in grid Returns: path: List of 2-bit quadrant choices (0=TL, 1=TR, 2=BL, 3=BR) """ path = [] r_start, r_end = 0, grid_rows c_start, c_end = 0, grid_cols # Binary subdivision: at each level, determine which quadrant the tile is in for _ in range(16): # Max 16 levels (32-bit heat code) if r_end - r_start <= 1 and c_end - c_start <= 1: break r_mid = (r_start + r_end) // 2 c_mid = (c_start + c_end) // 2 # Determine quadrant (00=TL, 01=TR, 10=BL, 11=BR) in_bottom = tile_row >= r_mid if r_mid < r_end else False in_right = tile_col >= c_mid if c_mid < c_end else False quadrant = (int(in_bottom) << 1) | int(in_right) path.append(quadrant) # Narrow search space if in_bottom: r_start = r_mid else: r_end = r_mid if in_right: c_start = c_mid else: c_end = c_mid return path def encode_tile_metadata(width: int, height: int, tile_row: int, tile_col: int, grid_rows: int, grid_cols: int) -> bytes: """ Encode image and tile metadata into first bytes of payload. Format: [img_w:2B][img_h:2B][tile_row:1B][tile_col:1B][grid_rows:1B][grid_cols:1B] = 8 bytes """ return struct.pack('>HHBBBB', width, height, tile_row, tile_col, grid_rows, grid_cols) def decode_tile_metadata(payload: bytes) -> Tuple[int, int, int, int, int, int]: """ Decode image and tile metadata from payload. Returns: (img_width, img_height, tile_row, tile_col, grid_rows, grid_cols) """ if len(payload) < 8: return (0, 0, 0, 0, 0, 0) return struct.unpack('>HHBBBB', payload[:8]) # ========================================== # TILE BAKER # ========================================== class LogosBaker: """ Phase 4 Baker: Fractal Tile Encoding for Round-Trip Reconstruction - Each tile's position is encoded into its heat code - Payload contains tile metadata + raw pixel data - Decoder can reconstruct original image exactly """ def __init__(self, source_path: str, event_callback=None): self.source_path = source_path self.event_callback = event_callback self.atoms: List[bytes] = [] # Load image img = cv2.imread(self.source_path) if img is None: raise ValueError(f"Could not load source: {self.source_path}") self.img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) self.height, self.width, self.channels = self.img.shape self._log(f"[LOGOS] Reality Ingested: {self.width}x{self.height}") self._log("[LOGOS] Mode: Fractal Tile Encoding (Round-Trip)") def _log(self, msg: str): print(msg) if self.event_callback: try: self.event_callback(msg) except Exception: pass def bake(self, output_path: str, grid_rows: int = 8, grid_cols: int = 8) -> dict: """ Bake image into SPCW atom stream. Args: output_path: Output .spcw file path grid_rows: Number of tile rows (default 8) grid_cols: Number of tile columns (default 8) """ start_time = time.perf_counter() self._log(f"[LOGOS] Dissolving Reality into {grid_rows}x{grid_cols} tiles...") # Calculate tile dimensions tile_h = math.ceil(self.height / grid_rows) tile_w = math.ceil(self.width / grid_cols) # Metadata for payload METADATA_SIZE = 8 PIXEL_DATA_SIZE = PAYLOAD_SIZE - META_SIZE - METADATA_SIZE atoms_out = [] # Process each tile for tr in range(grid_rows): for tc in range(grid_cols): # Extract tile region y0 = tr * tile_h y1 = min(self.height, y0 + tile_h) x0 = tc * tile_w x1 = min(self.width, x0 + tile_w) tile = self.img[y0:y1, x0:x1, :] # Compute quadtree path for this tile's position path = tile_to_quadtree_path(tr, tc, grid_rows, grid_cols) heat_code = calculate_heat_code(path) # Build payload: metadata + pixel data meta_bytes = encode_tile_metadata( self.width, self.height, tr, tc, grid_rows, grid_cols ) # Flatten tile pixels (RGB) tile_flat = tile.flatten() # Split tile into atoms (all atoms include metadata for decoding) chunk_idx = 0 offset = 0 while offset < len(tile_flat): chunk = tile_flat[offset:offset + PIXEL_DATA_SIZE] offset += PIXEL_DATA_SIZE # All atoms include metadata for proper decoding payload = meta_bytes + chunk.tobytes() # Heat code encodes tile position atom = pack_atom(heat_code, payload, domain_key="medium", gap_id=chunk_idx) atoms_out.append(atom) chunk_idx += 1 # Write stream with open(output_path, 'wb') as f: for atom in atoms_out: f.write(atom) # Stats elapsed = time.perf_counter() - start_time raw_bytes = self.width * self.height * self.channels baked_bytes = len(atoms_out) * ATOM_SIZE comp_ratio = (baked_bytes / raw_bytes) * 100 if raw_bytes else 0 stats = { "waves": grid_rows * grid_cols, "atoms": len(atoms_out), "baked_bytes": baked_bytes, "raw_bytes": raw_bytes, "compression_pct": comp_ratio, "elapsed_seconds": elapsed, "width": self.width, "height": self.height, "grid": (grid_rows, grid_cols), } self._log(f"[LOGOS] Tiles: {grid_rows}x{grid_cols} = {grid_rows * grid_cols}") self._log(f"[LOGOS] Atoms: {len(atoms_out)}") self._log(f"[LOGOS] Baked Size: {baked_bytes/1024:.2f} KB ({comp_ratio:.1f}% of raw)") self._log(f"[LOGOS] Time: {elapsed:.3f}s") self._log("[STATE] DETERMINISTIC | Dissolution Complete") self.atoms = atoms_out return {"state": {"state": "DETERMINISTIC", "prime": "Fractal Addressing"}, "stats": stats} def main(): parser = argparse.ArgumentParser(description="LOGOS Baker: Image -> SPCW Stream") parser.add_argument("input", help="Source Image") parser.add_argument("output", help="Output .spcw file") parser.add_argument("--grid", type=int, nargs=2, default=[8, 8], help="Grid dimensions (rows cols), default: 8 8") args = parser.parse_args() try: baker = LogosBaker(args.input) baker.bake(args.output, grid_rows=args.grid[0], grid_cols=args.grid[1]) except Exception as e: print(f"[ERROR] {e}") raise if __name__ == "__main__": main()