Fred808's picture
Upload 256 files
7a0c684 verified
"""
Memory pool implementation for efficient memory management
"""
from typing import Dict, Any, List, Optional
from collections import deque
import array
import weakref
class MemoryChunk:
"""Represents a chunk of memory that can be reused"""
__slots__ = ('size', 'data', 'is_free', 'offset', 'next', 'prev')
def __init__(self, size: int, offset: int):
self.size = size
self.data = array.array('B', [0] * size) # Use array for memory efficiency
self.is_free = True
self.offset = offset
self.next: Optional['MemoryChunk'] = None
self.prev: Optional['MemoryChunk'] = None
class MemoryPool:
"""Efficient memory pool implementation using linked chunks"""
__slots__ = ('total_size', 'head', 'free_chunks', '_weak_refs')
def __init__(self, total_size: int):
self.total_size = total_size
self.head = MemoryChunk(total_size, 0)
self.free_chunks = {total_size: deque([self.head])} # Map sizes to available chunks
self._weak_refs: Dict[int, weakref.ref] = {} # Track allocations with weak references
def allocate(self, size: int) -> Optional[int]:
"""Allocate memory with best-fit strategy"""
# Round up size to nearest power of 2 for better reuse
size = 1 << (size - 1).bit_length()
# Find smallest chunk that fits
for chunk_size in sorted(self.free_chunks.keys()):
if chunk_size >= size and self.free_chunks[chunk_size]:
chunk = self.free_chunks[chunk_size].popleft()
if not self.free_chunks[chunk_size]:
del self.free_chunks[chunk_size]
# Split chunk if too large
if chunk.size > size * 2:
remaining_size = chunk.size - size
new_chunk = MemoryChunk(remaining_size, chunk.offset + size)
new_chunk.prev = chunk
new_chunk.next = chunk.next
chunk.next = new_chunk
chunk.size = size
# Add remaining to free list
if remaining_size not in self.free_chunks:
self.free_chunks[remaining_size] = deque()
self.free_chunks[remaining_size].append(new_chunk)
chunk.is_free = False
# Create weak reference to track usage
self._weak_refs[chunk.offset] = weakref.ref(chunk,
lambda ref, offset=chunk.offset: self._cleanup(offset))
return chunk.offset
return None
def free(self, offset: int):
"""Free allocated memory and merge adjacent free chunks"""
if offset not in self._weak_refs:
return
chunk_ref = self._weak_refs[offset]
chunk = chunk_ref()
if chunk is None or chunk.is_free:
return
chunk.is_free = True
# Merge with next chunk if free
while chunk.next and chunk.next.is_free:
next_chunk = chunk.next
chunk.size += next_chunk.size
chunk.next = next_chunk.next
if chunk.next:
chunk.next.prev = chunk
# Remove next chunk from free list
self.free_chunks[next_chunk.size].remove(next_chunk)
if not self.free_chunks[next_chunk.size]:
del self.free_chunks[next_chunk.size]
# Merge with previous chunk if free
while chunk.prev and chunk.prev.is_free:
prev_chunk = chunk.prev
prev_chunk.size += chunk.size
prev_chunk.next = chunk.next
if chunk.next:
chunk.next.prev = prev_chunk
chunk = prev_chunk
# Add to free list
if chunk.size not in self.free_chunks:
self.free_chunks[chunk.size] = deque()
self.free_chunks[chunk.size].append(chunk)
# Clean up weak reference
del self._weak_refs[offset]
def _cleanup(self, offset: int):
"""Callback for when a chunk is garbage collected"""
self.free(offset)
def get_fragmentation(self) -> float:
"""Calculate memory fragmentation ratio"""
free_chunks = sum(len(chunks) for chunks in self.free_chunks.values())
if free_chunks == 0:
return 0.0
largest_free = max(self.free_chunks.keys()) if self.free_chunks else 0
return 1.0 - (largest_free / self.total_size)
class SharedMemoryPool(MemoryPool):
"""Memory pool optimized for shared memory access patterns"""
__slots__ = ('block_size', 'block_locks')
def __init__(self, total_size: int, block_size: int = 128):
super().__init__(total_size)
self.block_size = block_size
self.block_locks = array.array('B', [0] * (total_size // block_size)) # Use array for atomic operations
def atomic_allocate(self, size: int) -> Optional[int]:
"""Thread-safe allocation"""
offset = self.allocate(size)
if offset is not None:
block_idx = offset // self.block_size
self.block_locks[block_idx] = 1 # Mark as locked
return offset
def atomic_free(self, offset: int):
"""Thread-safe deallocation"""
block_idx = offset // self.block_size
self.block_locks[block_idx] = 0 # Release lock
self.free(offset)