NEWM / virtual_ssd /virtual_ssd_web /src /ssd_controller.py
Factor Studios
Upload 167 files
684cc60 verified
"""
SSD Controller Module - Implements Flash Translation Layer (FTL)
"""
import json
import os
import time
import threading
import random
import base64
from typing import Dict, List, Optional, Tuple
class SSDController:
"""
Implements Flash Translation Layer (FTL) functionality.
Maps Logical Block Addresses (LBA) to Physical Page Numbers (PPN).
Handles wear leveling and garbage collection.
Stores FTL data entirely in-memory and provides snapshot functionality for persistence.
"""
def __init__(self, virtual_flash, logical_blocks: int, pages_per_logical_block: int = 1):
self.virtual_flash = virtual_flash
self.logical_blocks = logical_blocks
self.pages_per_logical_block = pages_per_logical_block
self.lock = threading.RLock()
# In-memory FTL mapping table
self.ftl_data = {
"lba_to_ppn": {}, # Logical Block Address to Physical Page Number
"ppn_to_lba": {}, # Physical Page Number to Logical Block Address (reverse mapping)
"invalid_pages": [], # List of invalid physical pages
"wear_leveling_threshold": 100,
"gc_threshold": 0.8, # Garbage collection threshold (80% full)
"write_count": 0,
"gc_count": 0
}
# Track next available physical page for allocation
self.next_physical_page = self._find_next_physical_page()
print(f"SSDController initialized: {logical_blocks} logical blocks, FTL mapping in-memory")
def _find_next_physical_page(self) -> int:
"""Find the next available physical page for allocation."""
used_pages = set()
for ppn_str in self.ftl_data["lba_to_ppn"].values():
if isinstance(ppn_str, list):
used_pages.update(ppn_str)
else:
used_pages.add(ppn_str)
# Find first unused page
page = 0
while page in used_pages:
page += 1
return page
def _allocate_physical_page(self) -> int:
"""
Allocate a new physical page.
This is a simplified allocation that just increments the next_physical_page counter.
In a real FTL, this would involve finding free pages in erase blocks.
"""
with self.lock:
# Find next available page
# This simplified logic assumes continuous allocation and doesn't reuse freed pages efficiently
# A more advanced FTL would manage free page lists within blocks.
allocated_page = self.next_physical_page
self.next_physical_page += 1
return allocated_page
def write_logical_block(self, lba: int, data: bytes) -> bool:
"""
Write data to a logical block address.
Handles FTL mapping and wear leveling.
"""
if lba >= self.logical_blocks:
raise ValueError(f"LBA {lba} exceeds logical capacity")
with self.lock:
lba_str = str(lba)
# Check if LBA already has a mapping
old_ppn = self.ftl_data["lba_to_ppn"].get(lba_str)
# Allocate new physical page
new_ppn = self._allocate_physical_page()
# Write data to physical page
success = self.virtual_flash.write_page(new_ppn, data)
if not success:
print(f"Failed to write to physical page {new_ppn}")
return False
# Update FTL mapping
self.ftl_data["lba_to_ppn"][lba_str] = new_ppn
self.ftl_data["ppn_to_lba"][str(new_ppn)] = lba
# Mark old page as invalid if it exists
if old_ppn is not None:
old_ppn_str = str(old_ppn)
if old_ppn_str in self.ftl_data["ppn_to_lba"]:
del self.ftl_data["ppn_to_lba"][old_ppn_str]
self.ftl_data["invalid_pages"].append(old_ppn)
self.ftl_data["write_count"] += 1
# Check if garbage collection is needed
if self._should_trigger_gc():
self._garbage_collect()
return True
def read_logical_block(self, lba: int) -> Optional[bytes]:
"""
Read data from a logical block address.
"""
if lba >= self.logical_blocks:
raise ValueError(f"LBA {lba} exceeds logical capacity")
with self.lock:
lba_str = str(lba)
ppn = self.ftl_data["lba_to_ppn"].get(lba_str)
if ppn is None:
# LBA not mapped, return empty data
return b'\x00' * self.virtual_flash.page_size
return self.virtual_flash.read_page(ppn)
def _should_trigger_gc(self) -> bool:
"""
Determine if garbage collection should be triggered.
"""
total_pages_used = len(self.ftl_data["lba_to_ppn"])
invalid_pages = len(self.ftl_data["invalid_pages"])
if total_pages_used == 0:
return False
invalid_ratio = invalid_pages / (total_pages_used + invalid_pages)
return invalid_ratio >= self.ftl_data["gc_threshold"]
def _garbage_collect(self):
"""
Perform garbage collection to reclaim invalid pages.
This is a simplified implementation.
"""
with self.lock:
print("Starting garbage collection...")
# Get blocks with invalid pages
invalid_pages = self.ftl_data["invalid_pages"]
if not invalid_pages:
print("No invalid pages to collect")
return
# Group invalid pages by block
invalid_blocks = {}
for ppn in invalid_pages:
block_num = ppn // self.virtual_flash.pages_per_block
if block_num not in invalid_blocks:
invalid_blocks[block_num] = []
invalid_blocks[block_num].append(ppn)
# For each block with significant invalid pages, consider erasing
blocks_erased = 0
for block_num, block_invalid_pages in invalid_blocks.items():
# If more than 50% of block is invalid, erase it
if len(block_invalid_pages) > self.virtual_flash.pages_per_block * 0.5:
# Move valid pages to new locations first
self._relocate_valid_pages_in_block(block_num)
# Erase the block
if self.virtual_flash.erase_block(block_num):
blocks_erased += 1
# Remove invalid pages from tracking
for ppn in block_invalid_pages:
if ppn in self.ftl_data["invalid_pages"]:
self.ftl_data["invalid_pages"].remove(ppn)
self.ftl_data["gc_count"] += 1
print(f"Garbage collection complete: {blocks_erased} blocks erased")
def _relocate_valid_pages_in_block(self, block_num: int):
"""
Relocate valid pages in a block before erasing it.
"""
start_page = block_num * self.virtual_flash.pages_per_block
end_page = start_page + self.virtual_flash.pages_per_block
for ppn in range(start_page, end_page):
ppn_str = str(ppn)
if ppn_str in self.ftl_data["ppn_to_lba"]:
# This page is valid and needs to be relocated
lba = self.ftl_data["ppn_to_lba"][ppn_str]
# Read data from old location
data = self.virtual_flash.read_page(ppn)
if data:
# Allocate new physical page
new_ppn = self._allocate_physical_page()
# Write data to new location
if self.virtual_flash.write_page(new_ppn, data):
# Update FTL mapping
lba_str = str(lba)
self.ftl_data["lba_to_ppn"][lba_str] = new_ppn
self.ftl_data["ppn_to_lba"][str(new_ppn)] = lba
# Remove old mapping
del self.ftl_data["ppn_to_lba"][ppn_str]
def trim_logical_block(self, lba: int):
"""
Mark a logical block as unused (TRIM command).
"""
with self.lock:
lba_str = str(lba)
ppn = self.ftl_data["lba_to_ppn"].get(lba_str)
if ppn is not None:
# Remove mapping
del self.ftl_data["lba_to_ppn"][lba_str]
ppn_str = str(ppn)
if ppn_str in self.ftl_data["ppn_to_lba"]:
del self.ftl_data["ppn_to_lba"][ppn_str]
# Mark page as invalid
self.ftl_data["invalid_pages"].append(ppn)
def get_ftl_stats(self) -> Dict:
"""
Get FTL statistics.
"""
with self.lock:
mapped_lbas = len(self.ftl_data["lba_to_ppn"])
invalid_pages = len(self.ftl_data["invalid_pages"])
return {
"logical_blocks": self.logical_blocks,
"mapped_lbas": mapped_lbas,
"unmapped_lbas": self.logical_blocks - mapped_lbas,
"invalid_pages": invalid_pages,
"write_count": self.ftl_data["write_count"],
"gc_count": self.ftl_data["gc_count"],
"next_physical_page": self.next_physical_page
}
def get_mapping_table(self) -> Dict:
"""
Get the current LBA to PPN mapping table (for debugging).
"""
with self.lock:
return self.ftl_data["lba_to_ppn"].copy()
def force_garbage_collection(self):
"""
Force garbage collection to run.
"""
self._garbage_collect()
def export_snapshot(self) -> str:
"""
Exports the current state of the FTL as a base64 encoded JSON string.
"""
with self.lock:
snapshot_data = {
"ftl_data": self.ftl_data,
"next_physical_page": self.next_physical_page,
"logical_blocks": self.logical_blocks,
"pages_per_logical_block": self.pages_per_logical_block
}
return base64.b64encode(json.dumps(snapshot_data).encode("utf-8")).decode("utf-8")
def mount_snapshot(self, snapshot_json_or_url: str):
"""
Loads the state of the FTL from a base64 encoded JSON string.
"""
with self.lock:
try:
decoded_data = base64.b64decode(snapshot_json_or_url).decode("utf-8")
snapshot_data = json.loads(decoded_data)
self.ftl_data = snapshot_data["ftl_data"]
self.next_physical_page = snapshot_data["next_physical_page"]
self.logical_blocks = snapshot_data.get("logical_blocks", self.logical_blocks)
self.pages_per_logical_block = snapshot_data.get("pages_per_logical_block", self.pages_per_logical_block)
print("SSDController state loaded from snapshot.")
except Exception as e:
print(f"Error loading FTL snapshot: {e}")
raise
def shutdown(self):
"""
No specific shutdown needed for in-memory FTL, as state is not persisted to disk automatically.
"""
print("SSDController shutdown complete")
def __del__(self):
try:
self.shutdown()
except:
pass
def export_snapshot(self) -> str:
"""
Exports the current state of the FTL as a base64 encoded JSON string.
"""
with self.lock:
snapshot_data = {
"ftl_data": self.ftl_data,
"next_physical_page": self.next_physical_page,
"logical_blocks": self.logical_blocks,
"pages_per_logical_block": self.pages_per_logical_block
}
return base64.b64encode(json.dumps(snapshot_data).encode("utf-8")).decode("utf-8")
def mount_snapshot(self, snapshot_json_or_url: str):
"""
Loads the state of the FTL from a base64 encoded JSON string.
"""
with self.lock:
try:
decoded_data = base64.b64decode(snapshot_json_or_url).decode("utf-8")
snapshot_data = json.loads(decoded_data)
self.ftl_data = snapshot_data["ftl_data"]
self.next_physical_page = snapshot_data["next_physical_page"]
self.logical_blocks = snapshot_data.get("logical_blocks", self.logical_blocks)
self.pages_per_logical_block = snapshot_data.get("pages_per_logical_block", self.pages_per_logical_block)
print("SSDController state loaded from snapshot.")
except Exception as e:
print(f"Error loading FTL snapshot: {e}")
raise