import logging from logos.network.dissolution import DissolutionEngine logger = logging.getLogger("StorageEngine") class StorageShard: """A Fractal Shard of Data.""" def __init__(self, shard_id, data, level): self.id = shard_id self.data = data self.level = level # 0=Root(4k), 1=1k, 2=256B, 3=64B self.heat = self._calculate_heat() self.target_node = None def _calculate_heat(self): """Calculates Entropy Heat (0-1.0).""" if not self.data: return 0.0 # Simple Shannon-ish approximation using Dissolution Engine # Ratio of 1s to Total Bits ones = 0 total = len(self.data) * 8 for byte in self.data: ones += bin(byte).count('1') return round(ones / total, 4) def __repr__(self): return f"" class StorageEngine: """ Protocol 14: Fractal Storage Engine. Implements Quad-Tree Decomposition and Heat-Based Linking. """ SHARD_SIZES = { 0: 4096, # 4KB 1: 1024, # 1KB 2: 256, # 256B 3: 64 # 64B (Atom) } @staticmethod def store_file(file_content): """ Ingests data and fractures it into Matroska Shards. """ # Convert to bytes if needed if isinstance(file_content, str): file_content = file_content.encode('utf-8') logger.info(f"Fracturing {len(file_content)} bytes...") # Level 0: 4KB Blocks shards = [] block_size = StorageEngine.SHARD_SIZES[0] for i in range(0, len(file_content), block_size): chunk = file_content[i:i+block_size] shard_id = f"block_{i//block_size}" root_shard = StorageShard(shard_id, chunk, 0) # Recursive Decomposition StorageEngine._decompose(root_shard, shards) return shards @staticmethod def _decompose(parent_shard, shard_list): """Recursively splits shards until Level 3 (Atom).""" current_level = parent_shard.level next_level = current_level + 1 # Stop at Atom Level (Level 3 / 64B) if next_level > 3: shard_list.append(parent_shard) return # Quad-Tree Split (4 children) size = len(parent_shard.data) split_size = size // 4 # If data is too small to split, keep as is if size < 4: shard_list.append(parent_shard) return # Split logic for k in range(4): start = k * split_size end = start + split_size sub_data = parent_shard.data[start:end] if not sub_data: continue sub_id = f"{parent_shard.id}.{k}" child_shard = StorageShard(sub_id, sub_data, next_level) # Recurse StorageEngine._decompose(child_shard, shard_list) @staticmethod def link_to_topology(shards, topology): """ Assigns shards to Integer Nodes based on Heat. Hot -> Prime. Cold -> Composite. """ assignments = {} for shard in shards: # Heat Code Logic if shard.heat > 0.6: # Hot Data -> Mersenne Primes (3, 7, 31, 127) target = 7 # Simplification for prototype node_type = "MERSENNE_PRIME" elif shard.heat > 0.4: # Warm Data -> Prime Gateways (2, 5, 11) target = 5 node_type = "PRIME_GATEWAY" else: # Cold Data -> Abundant Hubs (12, 24) target = 12 node_type = "ABUNDANT_HUB" shard.target_node = target assignments[shard.id] = f"{target} ({node_type})" return assignments if __name__ == "__main__": # Test Run data = b"X" * 5000 # Dummy data shards = StorageEngine.store_file(data) print(f"Fractured into {len(shards)} shards.") print(f"Sample Shard: {shards[0]}") links = StorageEngine.link_to_topology(shards, None) print(f"Sample Link: {links[shards[0].id]}")