Spaces:
Runtime error
Runtime error
File size: 8,326 Bytes
ac73ca8 b8b5c5a ac73ca8 b8b5c5a ac73ca8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
"""
bake_stream.py - The LOGOS Encoder (Phase 4 - Fractal Round-Trip)
Spatial tiling with proper heat code addressing for lossless reconstruction.
Each tile's position is encoded into its heat code; payload contains raw pixel data.
"""
import math
import time
import cv2
import argparse
import struct
from typing import List, Tuple
from .logos_core import (
calculate_heat_code,
pack_atom,
PAYLOAD_SIZE,
ATOM_SIZE,
META_SIZE,
)
# ==========================================
# FRACTAL TILE ADDRESSING
# ==========================================
def tile_to_quadtree_path(tile_row: int, tile_col: int, grid_rows: int, grid_cols: int) -> List[int]:
"""
Convert a tile's (row, col) position to a quadtree navigation path.
This encodes the spatial position into 2-bit quadrant choices.
Args:
tile_row: Row index of tile (0-based)
tile_col: Column index of tile (0-based)
grid_rows: Total rows in grid
grid_cols: Total columns in grid
Returns:
path: List of 2-bit quadrant choices (0=TL, 1=TR, 2=BL, 3=BR)
"""
path = []
r_start, r_end = 0, grid_rows
c_start, c_end = 0, grid_cols
# Binary subdivision: at each level, determine which quadrant the tile is in
for _ in range(16): # Max 16 levels (32-bit heat code)
if r_end - r_start <= 1 and c_end - c_start <= 1:
break
r_mid = (r_start + r_end) // 2
c_mid = (c_start + c_end) // 2
# Determine quadrant (00=TL, 01=TR, 10=BL, 11=BR)
in_bottom = tile_row >= r_mid if r_mid < r_end else False
in_right = tile_col >= c_mid if c_mid < c_end else False
quadrant = (int(in_bottom) << 1) | int(in_right)
path.append(quadrant)
# Narrow search space
if in_bottom:
r_start = r_mid
else:
r_end = r_mid
if in_right:
c_start = c_mid
else:
c_end = c_mid
return path
def encode_tile_metadata(width: int, height: int, tile_row: int, tile_col: int,
grid_rows: int, grid_cols: int) -> bytes:
"""
Encode image and tile metadata into first bytes of payload.
Format: [img_w:2B][img_h:2B][tile_row:1B][tile_col:1B][grid_rows:1B][grid_cols:1B] = 8 bytes
"""
return struct.pack('>HHBBBB', width, height, tile_row, tile_col, grid_rows, grid_cols)
def decode_tile_metadata(payload: bytes) -> Tuple[int, int, int, int, int, int]:
"""
Decode image and tile metadata from payload.
Returns: (img_width, img_height, tile_row, tile_col, grid_rows, grid_cols)
"""
if len(payload) < 8:
return (0, 0, 0, 0, 0, 0)
return struct.unpack('>HHBBBB', payload[:8])
# ==========================================
# TILE BAKER
# ==========================================
class LogosBaker:
"""
Phase 4 Baker: Fractal Tile Encoding for Round-Trip Reconstruction
- Each tile's position is encoded into its heat code
- Payload contains tile metadata + raw pixel data
- Decoder can reconstruct original image exactly
"""
def __init__(self, source_path: str, event_callback=None):
self.source_path = source_path
self.event_callback = event_callback
self.atoms: List[bytes] = []
# Load image
img = cv2.imread(self.source_path)
if img is None:
raise ValueError(f"Could not load source: {self.source_path}")
self.img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
self.height, self.width, self.channels = self.img.shape
self._log(f"[LOGOS] Reality Ingested: {self.width}x{self.height}")
self._log("[LOGOS] Mode: Fractal Tile Encoding (Round-Trip)")
def _log(self, msg: str):
print(msg)
if self.event_callback:
try:
self.event_callback(msg)
except Exception:
pass
def bake(self, output_path: str, grid_rows: int = 8, grid_cols: int = 8) -> dict:
"""
Bake image into SPCW atom stream.
Args:
output_path: Output .spcw file path
grid_rows: Number of tile rows (default 8)
grid_cols: Number of tile columns (default 8)
"""
start_time = time.perf_counter()
self._log(f"[LOGOS] Dissolving Reality into {grid_rows}x{grid_cols} tiles...")
# Calculate tile dimensions
tile_h = math.ceil(self.height / grid_rows)
tile_w = math.ceil(self.width / grid_cols)
# Metadata for payload
METADATA_SIZE = 8
PIXEL_DATA_SIZE = PAYLOAD_SIZE - META_SIZE - METADATA_SIZE
atoms_out = []
# Process each tile
for tr in range(grid_rows):
for tc in range(grid_cols):
# Extract tile region
y0 = tr * tile_h
y1 = min(self.height, y0 + tile_h)
x0 = tc * tile_w
x1 = min(self.width, x0 + tile_w)
tile = self.img[y0:y1, x0:x1, :]
# Compute quadtree path for this tile's position
path = tile_to_quadtree_path(tr, tc, grid_rows, grid_cols)
heat_code = calculate_heat_code(path)
# Build payload: metadata + pixel data
meta_bytes = encode_tile_metadata(
self.width, self.height, tr, tc, grid_rows, grid_cols
)
# Flatten tile pixels (RGB)
tile_flat = tile.flatten()
# Split tile into atoms (all atoms include metadata for decoding)
chunk_idx = 0
offset = 0
while offset < len(tile_flat):
chunk = tile_flat[offset:offset + PIXEL_DATA_SIZE]
offset += PIXEL_DATA_SIZE
# All atoms include metadata for proper decoding
payload = meta_bytes + chunk.tobytes()
# Heat code encodes tile position
atom = pack_atom(heat_code, payload, domain_key="medium", gap_id=chunk_idx)
atoms_out.append(atom)
chunk_idx += 1
# Write stream
with open(output_path, 'wb') as f:
for atom in atoms_out:
f.write(atom)
# Stats
elapsed = time.perf_counter() - start_time
raw_bytes = self.width * self.height * self.channels
baked_bytes = len(atoms_out) * ATOM_SIZE
comp_ratio = (baked_bytes / raw_bytes) * 100 if raw_bytes else 0
stats = {
"waves": grid_rows * grid_cols,
"atoms": len(atoms_out),
"baked_bytes": baked_bytes,
"raw_bytes": raw_bytes,
"compression_pct": comp_ratio,
"elapsed_seconds": elapsed,
"width": self.width,
"height": self.height,
"grid": (grid_rows, grid_cols),
}
self._log(f"[LOGOS] Tiles: {grid_rows}x{grid_cols} = {grid_rows * grid_cols}")
self._log(f"[LOGOS] Atoms: {len(atoms_out)}")
self._log(f"[LOGOS] Baked Size: {baked_bytes/1024:.2f} KB ({comp_ratio:.1f}% of raw)")
self._log(f"[LOGOS] Time: {elapsed:.3f}s")
self._log("[STATE] DETERMINISTIC | Dissolution Complete")
self.atoms = atoms_out
return {"state": {"state": "DETERMINISTIC", "prime": "Fractal Addressing"}, "stats": stats}
def main():
parser = argparse.ArgumentParser(description="LOGOS Baker: Image -> SPCW Stream")
parser.add_argument("input", help="Source Image")
parser.add_argument("output", help="Output .spcw file")
parser.add_argument("--grid", type=int, nargs=2, default=[8, 8],
help="Grid dimensions (rows cols), default: 8 8")
args = parser.parse_args()
try:
baker = LogosBaker(args.input)
baker.bake(args.output, grid_rows=args.grid[0], grid_cols=args.grid[1])
except Exception as e:
print(f"[ERROR] {e}")
raise
if __name__ == "__main__":
main()
|