INV / virtual_gpu_driver /src /memory_pool.py
Fred808's picture
Upload 256 files
7a0c684 verified
from typing import Optional, Dict, List
import array
from collections import deque
import weakref
class MemoryChunk:
"""Represents a chunk of memory that can be reused"""
__slots__ = ('size', 'data', 'is_free', 'offset', 'next', 'prev')
def __init__(self, size: int, offset: int):
self.size = size
self.data = array.array('B', [0] * size) # Use array instead of bytearray for better memory efficiency
self.is_free = True
self.offset = offset
self.next: Optional['MemoryChunk'] = None
self.prev: Optional['MemoryChunk'] = None
class MemoryPool:
"""Efficient memory pool implementation using linked chunks"""
__slots__ = ('total_size', 'head', 'free_chunks', '_weak_refs')
def __init__(self, total_size: int):
self.total_size = total_size
self.head = MemoryChunk(total_size, 0)
self.free_chunks = {total_size: deque([self.head])} # Map sizes to available chunks
self._weak_refs: Dict[int, weakref.ref] = {} # Track allocations with weak references
def allocate(self, size: int) -> Optional[int]:
"""Allocate memory with best-fit strategy"""
# Round up size to nearest power of 2 for better reuse
size = 1 << (size - 1).bit_length()
# Find smallest chunk that fits
for chunk_size in sorted(self.free_chunks.keys()):
if chunk_size >= size and self.free_chunks[chunk_size]:
chunk = self.free_chunks[chunk_size].popleft()
if not self.free_chunks[chunk_size]:
del self.free_chunks[chunk_size]
# Split chunk if too large
if chunk.size > size * 2:
remaining_size = chunk.size - size
new_chunk = MemoryChunk(remaining_size, chunk.offset + size)
new_chunk.prev = chunk
new_chunk.next = chunk.next
chunk.next = new_chunk
chunk.size = size
# Add remaining to free list
if remaining_size not in self.free_chunks:
self.free_chunks[remaining_size] = deque()
self.free_chunks[remaining_size].append(new_chunk)
chunk.is_free = False
# Create weak reference to track usage
self._weak_refs[chunk.offset] = weakref.ref(chunk,
lambda ref, offset=chunk.offset: self._cleanup(offset))
return chunk.offset
return None
def free(self, offset: int):
"""Free allocated memory and merge adjacent free chunks"""
if offset not in self._weak_refs:
return
chunk_ref = self._weak_refs[offset]
chunk = chunk_ref()
if chunk is None or chunk.is_free:
return
chunk.is_free = True
# Merge with next chunk if free
while chunk.next and chunk.next.is_free:
next_chunk = chunk.next
chunk.size += next_chunk.size
chunk.next = next_chunk.next
if chunk.next:
chunk.next.prev = chunk
# Remove next chunk from free list
self.free_chunks[next_chunk.size].remove(next_chunk)
if not self.free_chunks[next_chunk.size]:
del self.free_chunks[next_chunk.size]
# Merge with previous chunk if free
while chunk.prev and chunk.prev.is_free:
prev_chunk = chunk.prev
prev_chunk.size += chunk.size
prev_chunk.next = chunk.next
if chunk.next:
chunk.next.prev = prev_chunk
chunk = prev_chunk
# Add to free list
if chunk.size not in self.free_chunks:
self.free_chunks[chunk.size] = deque()
self.free_chunks[chunk.size].append(chunk)
# Clean up weak reference
del self._weak_refs[offset]
def _cleanup(self, offset: int):
"""Callback for when a chunk is garbage collected"""
self.free(offset)
def get_fragmentation(self) -> float:
"""Calculate memory fragmentation ratio"""
free_chunks = sum(len(chunks) for chunks in self.free_chunks.values())
if free_chunks == 0:
return 0.0
largest_free = max(self.free_chunks.keys()) if self.free_chunks else 0
return 1.0 - (largest_free / self.total_size)
class SharedMemoryPool(MemoryPool):
"""Memory pool optimized for shared memory access patterns"""
__slots__ = ('block_size', 'block_locks')
def __init__(self, total_size: int, block_size: int = 128):
super().__init__(total_size)
self.block_size = block_size
self.block_locks = array.array('B', [0] * (total_size // block_size)) # Use array for atomic operations
def atomic_allocate(self, size: int) -> Optional[int]:
"""Thread-safe allocation"""
offset = self.allocate(size)
if offset is not None:
block_idx = offset // self.block_size
self.block_locks[block_idx] = 1 # Mark as locked
return offset
def atomic_free(self, offset: int):
"""Thread-safe deallocation"""
block_idx = offset // self.block_size
self.block_locks[block_idx] = 0 # Release lock
self.free(offset)