LOGOS-SPCW-Matroska / logos /bake_stream.py
GitHub Copilot
Protocol 22: Terminology Cleanup & Final Manifold Alignment
b8b5c5a
"""
bake_stream.py - The LOGOS Encoder (Phase 4 - Fractal Round-Trip)
Spatial tiling with proper heat code addressing for lossless reconstruction.
Each tile's position is encoded into its heat code; payload contains raw pixel data.
"""
import math
import time
import cv2
import argparse
import struct
from typing import List, Tuple
from .logos_core import (
calculate_heat_code,
pack_atom,
PAYLOAD_SIZE,
ATOM_SIZE,
META_SIZE,
)
# ==========================================
# FRACTAL TILE ADDRESSING
# ==========================================
def tile_to_quadtree_path(tile_row: int, tile_col: int, grid_rows: int, grid_cols: int) -> List[int]:
"""
Convert a tile's (row, col) position to a quadtree navigation path.
This encodes the spatial position into 2-bit quadrant choices.
Args:
tile_row: Row index of tile (0-based)
tile_col: Column index of tile (0-based)
grid_rows: Total rows in grid
grid_cols: Total columns in grid
Returns:
path: List of 2-bit quadrant choices (0=TL, 1=TR, 2=BL, 3=BR)
"""
path = []
r_start, r_end = 0, grid_rows
c_start, c_end = 0, grid_cols
# Binary subdivision: at each level, determine which quadrant the tile is in
for _ in range(16): # Max 16 levels (32-bit heat code)
if r_end - r_start <= 1 and c_end - c_start <= 1:
break
r_mid = (r_start + r_end) // 2
c_mid = (c_start + c_end) // 2
# Determine quadrant (00=TL, 01=TR, 10=BL, 11=BR)
in_bottom = tile_row >= r_mid if r_mid < r_end else False
in_right = tile_col >= c_mid if c_mid < c_end else False
quadrant = (int(in_bottom) << 1) | int(in_right)
path.append(quadrant)
# Narrow search space
if in_bottom:
r_start = r_mid
else:
r_end = r_mid
if in_right:
c_start = c_mid
else:
c_end = c_mid
return path
def encode_tile_metadata(width: int, height: int, tile_row: int, tile_col: int,
grid_rows: int, grid_cols: int) -> bytes:
"""
Encode image and tile metadata into first bytes of payload.
Format: [img_w:2B][img_h:2B][tile_row:1B][tile_col:1B][grid_rows:1B][grid_cols:1B] = 8 bytes
"""
return struct.pack('>HHBBBB', width, height, tile_row, tile_col, grid_rows, grid_cols)
def decode_tile_metadata(payload: bytes) -> Tuple[int, int, int, int, int, int]:
"""
Decode image and tile metadata from payload.
Returns: (img_width, img_height, tile_row, tile_col, grid_rows, grid_cols)
"""
if len(payload) < 8:
return (0, 0, 0, 0, 0, 0)
return struct.unpack('>HHBBBB', payload[:8])
# ==========================================
# TILE BAKER
# ==========================================
class LogosBaker:
"""
Phase 4 Baker: Fractal Tile Encoding for Round-Trip Reconstruction
- Each tile's position is encoded into its heat code
- Payload contains tile metadata + raw pixel data
- Decoder can reconstruct original image exactly
"""
def __init__(self, source_path: str, event_callback=None):
self.source_path = source_path
self.event_callback = event_callback
self.atoms: List[bytes] = []
# Load image
img = cv2.imread(self.source_path)
if img is None:
raise ValueError(f"Could not load source: {self.source_path}")
self.img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
self.height, self.width, self.channels = self.img.shape
self._log(f"[LOGOS] Reality Ingested: {self.width}x{self.height}")
self._log("[LOGOS] Mode: Fractal Tile Encoding (Round-Trip)")
def _log(self, msg: str):
print(msg)
if self.event_callback:
try:
self.event_callback(msg)
except Exception:
pass
def bake(self, output_path: str, grid_rows: int = 8, grid_cols: int = 8) -> dict:
"""
Bake image into SPCW atom stream.
Args:
output_path: Output .spcw file path
grid_rows: Number of tile rows (default 8)
grid_cols: Number of tile columns (default 8)
"""
start_time = time.perf_counter()
self._log(f"[LOGOS] Dissolving Reality into {grid_rows}x{grid_cols} tiles...")
# Calculate tile dimensions
tile_h = math.ceil(self.height / grid_rows)
tile_w = math.ceil(self.width / grid_cols)
# Metadata for payload
METADATA_SIZE = 8
PIXEL_DATA_SIZE = PAYLOAD_SIZE - META_SIZE - METADATA_SIZE
atoms_out = []
# Process each tile
for tr in range(grid_rows):
for tc in range(grid_cols):
# Extract tile region
y0 = tr * tile_h
y1 = min(self.height, y0 + tile_h)
x0 = tc * tile_w
x1 = min(self.width, x0 + tile_w)
tile = self.img[y0:y1, x0:x1, :]
# Compute quadtree path for this tile's position
path = tile_to_quadtree_path(tr, tc, grid_rows, grid_cols)
heat_code = calculate_heat_code(path)
# Build payload: metadata + pixel data
meta_bytes = encode_tile_metadata(
self.width, self.height, tr, tc, grid_rows, grid_cols
)
# Flatten tile pixels (RGB)
tile_flat = tile.flatten()
# Split tile into atoms (all atoms include metadata for decoding)
chunk_idx = 0
offset = 0
while offset < len(tile_flat):
chunk = tile_flat[offset:offset + PIXEL_DATA_SIZE]
offset += PIXEL_DATA_SIZE
# All atoms include metadata for proper decoding
payload = meta_bytes + chunk.tobytes()
# Heat code encodes tile position
atom = pack_atom(heat_code, payload, domain_key="medium", gap_id=chunk_idx)
atoms_out.append(atom)
chunk_idx += 1
# Write stream
with open(output_path, 'wb') as f:
for atom in atoms_out:
f.write(atom)
# Stats
elapsed = time.perf_counter() - start_time
raw_bytes = self.width * self.height * self.channels
baked_bytes = len(atoms_out) * ATOM_SIZE
comp_ratio = (baked_bytes / raw_bytes) * 100 if raw_bytes else 0
stats = {
"waves": grid_rows * grid_cols,
"atoms": len(atoms_out),
"baked_bytes": baked_bytes,
"raw_bytes": raw_bytes,
"compression_pct": comp_ratio,
"elapsed_seconds": elapsed,
"width": self.width,
"height": self.height,
"grid": (grid_rows, grid_cols),
}
self._log(f"[LOGOS] Tiles: {grid_rows}x{grid_cols} = {grid_rows * grid_cols}")
self._log(f"[LOGOS] Atoms: {len(atoms_out)}")
self._log(f"[LOGOS] Baked Size: {baked_bytes/1024:.2f} KB ({comp_ratio:.1f}% of raw)")
self._log(f"[LOGOS] Time: {elapsed:.3f}s")
self._log("[STATE] DETERMINISTIC | Dissolution Complete")
self.atoms = atoms_out
return {"state": {"state": "DETERMINISTIC", "prime": "Fractal Addressing"}, "stats": stats}
def main():
parser = argparse.ArgumentParser(description="LOGOS Baker: Image -> SPCW Stream")
parser.add_argument("input", help="Source Image")
parser.add_argument("output", help="Output .spcw file")
parser.add_argument("--grid", type=int, nargs=2, default=[8, 8],
help="Grid dimensions (rows cols), default: 8 8")
args = parser.parse_args()
try:
baker = LogosBaker(args.input)
baker.bake(args.output, grid_rows=args.grid[0], grid_cols=args.grid[1])
except Exception as e:
print(f"[ERROR] {e}")
raise
if __name__ == "__main__":
main()