Spaces:
Runtime error
Runtime error
| """ | |
| bake_stream.py - The LOGOS Encoder (Phase 4 - Fractal Round-Trip) | |
| Spatial tiling with proper heat code addressing for lossless reconstruction. | |
| Each tile's position is encoded into its heat code; payload contains raw pixel data. | |
| """ | |
| import math | |
| import time | |
| import cv2 | |
| import argparse | |
| import struct | |
| from typing import List, Tuple | |
| from .logos_core import ( | |
| calculate_heat_code, | |
| pack_atom, | |
| PAYLOAD_SIZE, | |
| ATOM_SIZE, | |
| META_SIZE, | |
| ) | |
| # ========================================== | |
| # FRACTAL TILE ADDRESSING | |
| # ========================================== | |
| def tile_to_quadtree_path(tile_row: int, tile_col: int, grid_rows: int, grid_cols: int) -> List[int]: | |
| """ | |
| Convert a tile's (row, col) position to a quadtree navigation path. | |
| This encodes the spatial position into 2-bit quadrant choices. | |
| Args: | |
| tile_row: Row index of tile (0-based) | |
| tile_col: Column index of tile (0-based) | |
| grid_rows: Total rows in grid | |
| grid_cols: Total columns in grid | |
| Returns: | |
| path: List of 2-bit quadrant choices (0=TL, 1=TR, 2=BL, 3=BR) | |
| """ | |
| path = [] | |
| r_start, r_end = 0, grid_rows | |
| c_start, c_end = 0, grid_cols | |
| # Binary subdivision: at each level, determine which quadrant the tile is in | |
| for _ in range(16): # Max 16 levels (32-bit heat code) | |
| if r_end - r_start <= 1 and c_end - c_start <= 1: | |
| break | |
| r_mid = (r_start + r_end) // 2 | |
| c_mid = (c_start + c_end) // 2 | |
| # Determine quadrant (00=TL, 01=TR, 10=BL, 11=BR) | |
| in_bottom = tile_row >= r_mid if r_mid < r_end else False | |
| in_right = tile_col >= c_mid if c_mid < c_end else False | |
| quadrant = (int(in_bottom) << 1) | int(in_right) | |
| path.append(quadrant) | |
| # Narrow search space | |
| if in_bottom: | |
| r_start = r_mid | |
| else: | |
| r_end = r_mid | |
| if in_right: | |
| c_start = c_mid | |
| else: | |
| c_end = c_mid | |
| return path | |
| def encode_tile_metadata(width: int, height: int, tile_row: int, tile_col: int, | |
| grid_rows: int, grid_cols: int) -> bytes: | |
| """ | |
| Encode image and tile metadata into first bytes of payload. | |
| Format: [img_w:2B][img_h:2B][tile_row:1B][tile_col:1B][grid_rows:1B][grid_cols:1B] = 8 bytes | |
| """ | |
| return struct.pack('>HHBBBB', width, height, tile_row, tile_col, grid_rows, grid_cols) | |
| def decode_tile_metadata(payload: bytes) -> Tuple[int, int, int, int, int, int]: | |
| """ | |
| Decode image and tile metadata from payload. | |
| Returns: (img_width, img_height, tile_row, tile_col, grid_rows, grid_cols) | |
| """ | |
| if len(payload) < 8: | |
| return (0, 0, 0, 0, 0, 0) | |
| return struct.unpack('>HHBBBB', payload[:8]) | |
| # ========================================== | |
| # TILE BAKER | |
| # ========================================== | |
| class LogosBaker: | |
| """ | |
| Phase 4 Baker: Fractal Tile Encoding for Round-Trip Reconstruction | |
| - Each tile's position is encoded into its heat code | |
| - Payload contains tile metadata + raw pixel data | |
| - Decoder can reconstruct original image exactly | |
| """ | |
| def __init__(self, source_path: str, event_callback=None): | |
| self.source_path = source_path | |
| self.event_callback = event_callback | |
| self.atoms: List[bytes] = [] | |
| # Load image | |
| img = cv2.imread(self.source_path) | |
| if img is None: | |
| raise ValueError(f"Could not load source: {self.source_path}") | |
| self.img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| self.height, self.width, self.channels = self.img.shape | |
| self._log(f"[LOGOS] Reality Ingested: {self.width}x{self.height}") | |
| self._log("[LOGOS] Mode: Fractal Tile Encoding (Round-Trip)") | |
| def _log(self, msg: str): | |
| print(msg) | |
| if self.event_callback: | |
| try: | |
| self.event_callback(msg) | |
| except Exception: | |
| pass | |
| def bake(self, output_path: str, grid_rows: int = 8, grid_cols: int = 8) -> dict: | |
| """ | |
| Bake image into SPCW atom stream. | |
| Args: | |
| output_path: Output .spcw file path | |
| grid_rows: Number of tile rows (default 8) | |
| grid_cols: Number of tile columns (default 8) | |
| """ | |
| start_time = time.perf_counter() | |
| self._log(f"[LOGOS] Dissolving Reality into {grid_rows}x{grid_cols} tiles...") | |
| # Calculate tile dimensions | |
| tile_h = math.ceil(self.height / grid_rows) | |
| tile_w = math.ceil(self.width / grid_cols) | |
| # Metadata for payload | |
| METADATA_SIZE = 8 | |
| PIXEL_DATA_SIZE = PAYLOAD_SIZE - META_SIZE - METADATA_SIZE | |
| atoms_out = [] | |
| # Process each tile | |
| for tr in range(grid_rows): | |
| for tc in range(grid_cols): | |
| # Extract tile region | |
| y0 = tr * tile_h | |
| y1 = min(self.height, y0 + tile_h) | |
| x0 = tc * tile_w | |
| x1 = min(self.width, x0 + tile_w) | |
| tile = self.img[y0:y1, x0:x1, :] | |
| # Compute quadtree path for this tile's position | |
| path = tile_to_quadtree_path(tr, tc, grid_rows, grid_cols) | |
| heat_code = calculate_heat_code(path) | |
| # Build payload: metadata + pixel data | |
| meta_bytes = encode_tile_metadata( | |
| self.width, self.height, tr, tc, grid_rows, grid_cols | |
| ) | |
| # Flatten tile pixels (RGB) | |
| tile_flat = tile.flatten() | |
| # Split tile into atoms (all atoms include metadata for decoding) | |
| chunk_idx = 0 | |
| offset = 0 | |
| while offset < len(tile_flat): | |
| chunk = tile_flat[offset:offset + PIXEL_DATA_SIZE] | |
| offset += PIXEL_DATA_SIZE | |
| # All atoms include metadata for proper decoding | |
| payload = meta_bytes + chunk.tobytes() | |
| # Heat code encodes tile position | |
| atom = pack_atom(heat_code, payload, domain_key="medium", gap_id=chunk_idx) | |
| atoms_out.append(atom) | |
| chunk_idx += 1 | |
| # Write stream | |
| with open(output_path, 'wb') as f: | |
| for atom in atoms_out: | |
| f.write(atom) | |
| # Stats | |
| elapsed = time.perf_counter() - start_time | |
| raw_bytes = self.width * self.height * self.channels | |
| baked_bytes = len(atoms_out) * ATOM_SIZE | |
| comp_ratio = (baked_bytes / raw_bytes) * 100 if raw_bytes else 0 | |
| stats = { | |
| "waves": grid_rows * grid_cols, | |
| "atoms": len(atoms_out), | |
| "baked_bytes": baked_bytes, | |
| "raw_bytes": raw_bytes, | |
| "compression_pct": comp_ratio, | |
| "elapsed_seconds": elapsed, | |
| "width": self.width, | |
| "height": self.height, | |
| "grid": (grid_rows, grid_cols), | |
| } | |
| self._log(f"[LOGOS] Tiles: {grid_rows}x{grid_cols} = {grid_rows * grid_cols}") | |
| self._log(f"[LOGOS] Atoms: {len(atoms_out)}") | |
| self._log(f"[LOGOS] Baked Size: {baked_bytes/1024:.2f} KB ({comp_ratio:.1f}% of raw)") | |
| self._log(f"[LOGOS] Time: {elapsed:.3f}s") | |
| self._log("[STATE] DETERMINISTIC | Dissolution Complete") | |
| self.atoms = atoms_out | |
| return {"state": {"state": "DETERMINISTIC", "prime": "Fractal Addressing"}, "stats": stats} | |
| def main(): | |
| parser = argparse.ArgumentParser(description="LOGOS Baker: Image -> SPCW Stream") | |
| parser.add_argument("input", help="Source Image") | |
| parser.add_argument("output", help="Output .spcw file") | |
| parser.add_argument("--grid", type=int, nargs=2, default=[8, 8], | |
| help="Grid dimensions (rows cols), default: 8 8") | |
| args = parser.parse_args() | |
| try: | |
| baker = LogosBaker(args.input) | |
| baker.bake(args.output, grid_rows=args.grid[0], grid_cols=args.grid[1]) | |
| except Exception as e: | |
| print(f"[ERROR] {e}") | |
| raise | |
| if __name__ == "__main__": | |
| main() | |