|
|
from typing import Optional, Dict, List
|
|
|
import array
|
|
|
from collections import deque
|
|
|
import weakref
|
|
|
|
|
|
class MemoryChunk:
|
|
|
"""Represents a chunk of memory that can be reused"""
|
|
|
__slots__ = ('size', 'data', 'is_free', 'offset', 'next', 'prev')
|
|
|
|
|
|
def __init__(self, size: int, offset: int):
|
|
|
self.size = size
|
|
|
self.data = array.array('B', [0] * size)
|
|
|
self.is_free = True
|
|
|
self.offset = offset
|
|
|
self.next: Optional['MemoryChunk'] = None
|
|
|
self.prev: Optional['MemoryChunk'] = None
|
|
|
|
|
|
class MemoryPool:
|
|
|
"""Efficient memory pool implementation using linked chunks"""
|
|
|
__slots__ = ('total_size', 'head', 'free_chunks', '_weak_refs')
|
|
|
|
|
|
def __init__(self, total_size: int):
|
|
|
self.total_size = total_size
|
|
|
self.head = MemoryChunk(total_size, 0)
|
|
|
self.free_chunks = {total_size: deque([self.head])}
|
|
|
self._weak_refs: Dict[int, weakref.ref] = {}
|
|
|
|
|
|
def allocate(self, size: int) -> Optional[int]:
|
|
|
"""Allocate memory with best-fit strategy"""
|
|
|
|
|
|
size = 1 << (size - 1).bit_length()
|
|
|
|
|
|
|
|
|
for chunk_size in sorted(self.free_chunks.keys()):
|
|
|
if chunk_size >= size and self.free_chunks[chunk_size]:
|
|
|
chunk = self.free_chunks[chunk_size].popleft()
|
|
|
if not self.free_chunks[chunk_size]:
|
|
|
del self.free_chunks[chunk_size]
|
|
|
|
|
|
|
|
|
if chunk.size > size * 2:
|
|
|
remaining_size = chunk.size - size
|
|
|
new_chunk = MemoryChunk(remaining_size, chunk.offset + size)
|
|
|
new_chunk.prev = chunk
|
|
|
new_chunk.next = chunk.next
|
|
|
chunk.next = new_chunk
|
|
|
chunk.size = size
|
|
|
|
|
|
|
|
|
if remaining_size not in self.free_chunks:
|
|
|
self.free_chunks[remaining_size] = deque()
|
|
|
self.free_chunks[remaining_size].append(new_chunk)
|
|
|
|
|
|
chunk.is_free = False
|
|
|
|
|
|
self._weak_refs[chunk.offset] = weakref.ref(chunk,
|
|
|
lambda ref, offset=chunk.offset: self._cleanup(offset))
|
|
|
return chunk.offset
|
|
|
|
|
|
return None
|
|
|
|
|
|
def free(self, offset: int):
|
|
|
"""Free allocated memory and merge adjacent free chunks"""
|
|
|
if offset not in self._weak_refs:
|
|
|
return
|
|
|
|
|
|
chunk_ref = self._weak_refs[offset]
|
|
|
chunk = chunk_ref()
|
|
|
if chunk is None or chunk.is_free:
|
|
|
return
|
|
|
|
|
|
chunk.is_free = True
|
|
|
|
|
|
|
|
|
while chunk.next and chunk.next.is_free:
|
|
|
next_chunk = chunk.next
|
|
|
chunk.size += next_chunk.size
|
|
|
chunk.next = next_chunk.next
|
|
|
if chunk.next:
|
|
|
chunk.next.prev = chunk
|
|
|
|
|
|
self.free_chunks[next_chunk.size].remove(next_chunk)
|
|
|
if not self.free_chunks[next_chunk.size]:
|
|
|
del self.free_chunks[next_chunk.size]
|
|
|
|
|
|
|
|
|
while chunk.prev and chunk.prev.is_free:
|
|
|
prev_chunk = chunk.prev
|
|
|
prev_chunk.size += chunk.size
|
|
|
prev_chunk.next = chunk.next
|
|
|
if chunk.next:
|
|
|
chunk.next.prev = prev_chunk
|
|
|
chunk = prev_chunk
|
|
|
|
|
|
|
|
|
if chunk.size not in self.free_chunks:
|
|
|
self.free_chunks[chunk.size] = deque()
|
|
|
self.free_chunks[chunk.size].append(chunk)
|
|
|
|
|
|
|
|
|
del self._weak_refs[offset]
|
|
|
|
|
|
def _cleanup(self, offset: int):
|
|
|
"""Callback for when a chunk is garbage collected"""
|
|
|
self.free(offset)
|
|
|
|
|
|
def get_fragmentation(self) -> float:
|
|
|
"""Calculate memory fragmentation ratio"""
|
|
|
free_chunks = sum(len(chunks) for chunks in self.free_chunks.values())
|
|
|
if free_chunks == 0:
|
|
|
return 0.0
|
|
|
largest_free = max(self.free_chunks.keys()) if self.free_chunks else 0
|
|
|
return 1.0 - (largest_free / self.total_size)
|
|
|
|
|
|
class SharedMemoryPool(MemoryPool):
|
|
|
"""Memory pool optimized for shared memory access patterns"""
|
|
|
__slots__ = ('block_size', 'block_locks')
|
|
|
|
|
|
def __init__(self, total_size: int, block_size: int = 128):
|
|
|
super().__init__(total_size)
|
|
|
self.block_size = block_size
|
|
|
self.block_locks = array.array('B', [0] * (total_size // block_size))
|
|
|
|
|
|
def atomic_allocate(self, size: int) -> Optional[int]:
|
|
|
"""Thread-safe allocation"""
|
|
|
offset = self.allocate(size)
|
|
|
if offset is not None:
|
|
|
block_idx = offset // self.block_size
|
|
|
self.block_locks[block_idx] = 1
|
|
|
return offset
|
|
|
|
|
|
def atomic_free(self, offset: int):
|
|
|
"""Thread-safe deallocation"""
|
|
|
block_idx = offset // self.block_size
|
|
|
self.block_locks[block_idx] = 0
|
|
|
self.free(offset)
|
|
|
|