from typing import Optional, Dict, List import array from collections import deque import weakref class MemoryChunk: """Represents a chunk of memory that can be reused""" __slots__ = ('size', 'data', 'is_free', 'offset', 'next', 'prev') def __init__(self, size: int, offset: int): self.size = size self.data = array.array('B', [0] * size) # Use array instead of bytearray for better memory efficiency self.is_free = True self.offset = offset self.next: Optional['MemoryChunk'] = None self.prev: Optional['MemoryChunk'] = None class MemoryPool: """Efficient memory pool implementation using linked chunks""" __slots__ = ('total_size', 'head', 'free_chunks', '_weak_refs') def __init__(self, total_size: int): self.total_size = total_size self.head = MemoryChunk(total_size, 0) self.free_chunks = {total_size: deque([self.head])} # Map sizes to available chunks self._weak_refs: Dict[int, weakref.ref] = {} # Track allocations with weak references def allocate(self, size: int) -> Optional[int]: """Allocate memory with best-fit strategy""" # Round up size to nearest power of 2 for better reuse size = 1 << (size - 1).bit_length() # Find smallest chunk that fits for chunk_size in sorted(self.free_chunks.keys()): if chunk_size >= size and self.free_chunks[chunk_size]: chunk = self.free_chunks[chunk_size].popleft() if not self.free_chunks[chunk_size]: del self.free_chunks[chunk_size] # Split chunk if too large if chunk.size > size * 2: remaining_size = chunk.size - size new_chunk = MemoryChunk(remaining_size, chunk.offset + size) new_chunk.prev = chunk new_chunk.next = chunk.next chunk.next = new_chunk chunk.size = size # Add remaining to free list if remaining_size not in self.free_chunks: self.free_chunks[remaining_size] = deque() self.free_chunks[remaining_size].append(new_chunk) chunk.is_free = False # Create weak reference to track usage self._weak_refs[chunk.offset] = weakref.ref(chunk, lambda ref, offset=chunk.offset: self._cleanup(offset)) return chunk.offset return None def free(self, offset: int): """Free allocated memory and merge adjacent free chunks""" if offset not in self._weak_refs: return chunk_ref = self._weak_refs[offset] chunk = chunk_ref() if chunk is None or chunk.is_free: return chunk.is_free = True # Merge with next chunk if free while chunk.next and chunk.next.is_free: next_chunk = chunk.next chunk.size += next_chunk.size chunk.next = next_chunk.next if chunk.next: chunk.next.prev = chunk # Remove next chunk from free list self.free_chunks[next_chunk.size].remove(next_chunk) if not self.free_chunks[next_chunk.size]: del self.free_chunks[next_chunk.size] # Merge with previous chunk if free while chunk.prev and chunk.prev.is_free: prev_chunk = chunk.prev prev_chunk.size += chunk.size prev_chunk.next = chunk.next if chunk.next: chunk.next.prev = prev_chunk chunk = prev_chunk # Add to free list if chunk.size not in self.free_chunks: self.free_chunks[chunk.size] = deque() self.free_chunks[chunk.size].append(chunk) # Clean up weak reference del self._weak_refs[offset] def _cleanup(self, offset: int): """Callback for when a chunk is garbage collected""" self.free(offset) def get_fragmentation(self) -> float: """Calculate memory fragmentation ratio""" free_chunks = sum(len(chunks) for chunks in self.free_chunks.values()) if free_chunks == 0: return 0.0 largest_free = max(self.free_chunks.keys()) if self.free_chunks else 0 return 1.0 - (largest_free / self.total_size) class SharedMemoryPool(MemoryPool): """Memory pool optimized for shared memory access patterns""" __slots__ = ('block_size', 'block_locks') def __init__(self, total_size: int, block_size: int = 128): super().__init__(total_size) self.block_size = block_size self.block_locks = array.array('B', [0] * (total_size // block_size)) # Use array for atomic operations def atomic_allocate(self, size: int) -> Optional[int]: """Thread-safe allocation""" offset = self.allocate(size) if offset is not None: block_idx = offset // self.block_size self.block_locks[block_idx] = 1 # Mark as locked return offset def atomic_free(self, offset: int): """Thread-safe deallocation""" block_idx = offset // self.block_size self.block_locks[block_idx] = 0 # Release lock self.free(offset)