""" SSD Controller Module - Implements Flash Translation Layer (FTL) """ import json import os import time import threading import random import base64 from typing import Dict, List, Optional, Tuple class SSDController: """ Implements Flash Translation Layer (FTL) functionality. Maps Logical Block Addresses (LBA) to Physical Page Numbers (PPN). Handles wear leveling and garbage collection. Stores FTL data entirely in-memory and provides snapshot functionality for persistence. """ def __init__(self, virtual_flash, logical_blocks: int, pages_per_logical_block: int = 1): self.virtual_flash = virtual_flash self.logical_blocks = logical_blocks self.pages_per_logical_block = pages_per_logical_block self.lock = threading.RLock() # In-memory FTL mapping table self.ftl_data = { "lba_to_ppn": {}, # Logical Block Address to Physical Page Number "ppn_to_lba": {}, # Physical Page Number to Logical Block Address (reverse mapping) "invalid_pages": [], # List of invalid physical pages "wear_leveling_threshold": 100, "gc_threshold": 0.8, # Garbage collection threshold (80% full) "write_count": 0, "gc_count": 0 } # Track next available physical page for allocation self.next_physical_page = self._find_next_physical_page() print(f"SSDController initialized: {logical_blocks} logical blocks, FTL mapping in-memory") def _find_next_physical_page(self) -> int: """Find the next available physical page for allocation.""" used_pages = set() for ppn_str in self.ftl_data["lba_to_ppn"].values(): if isinstance(ppn_str, list): used_pages.update(ppn_str) else: used_pages.add(ppn_str) # Find first unused page page = 0 while page in used_pages: page += 1 return page def _allocate_physical_page(self) -> int: """ Allocate a new physical page. This is a simplified allocation that just increments the next_physical_page counter. In a real FTL, this would involve finding free pages in erase blocks. """ with self.lock: # Find next available page # This simplified logic assumes continuous allocation and doesn't reuse freed pages efficiently # A more advanced FTL would manage free page lists within blocks. allocated_page = self.next_physical_page self.next_physical_page += 1 return allocated_page def write_logical_block(self, lba: int, data: bytes) -> bool: """ Write data to a logical block address. Handles FTL mapping and wear leveling. """ if lba >= self.logical_blocks: raise ValueError(f"LBA {lba} exceeds logical capacity") with self.lock: lba_str = str(lba) # Check if LBA already has a mapping old_ppn = self.ftl_data["lba_to_ppn"].get(lba_str) # Allocate new physical page new_ppn = self._allocate_physical_page() # Write data to physical page success = self.virtual_flash.write_page(new_ppn, data) if not success: print(f"Failed to write to physical page {new_ppn}") return False # Update FTL mapping self.ftl_data["lba_to_ppn"][lba_str] = new_ppn self.ftl_data["ppn_to_lba"][str(new_ppn)] = lba # Mark old page as invalid if it exists if old_ppn is not None: old_ppn_str = str(old_ppn) if old_ppn_str in self.ftl_data["ppn_to_lba"]: del self.ftl_data["ppn_to_lba"][old_ppn_str] self.ftl_data["invalid_pages"].append(old_ppn) self.ftl_data["write_count"] += 1 # Check if garbage collection is needed if self._should_trigger_gc(): self._garbage_collect() return True def read_logical_block(self, lba: int) -> Optional[bytes]: """ Read data from a logical block address. """ if lba >= self.logical_blocks: raise ValueError(f"LBA {lba} exceeds logical capacity") with self.lock: lba_str = str(lba) ppn = self.ftl_data["lba_to_ppn"].get(lba_str) if ppn is None: # LBA not mapped, return empty data return b'\x00' * self.virtual_flash.page_size return self.virtual_flash.read_page(ppn) def _should_trigger_gc(self) -> bool: """ Determine if garbage collection should be triggered. """ total_pages_used = len(self.ftl_data["lba_to_ppn"]) invalid_pages = len(self.ftl_data["invalid_pages"]) if total_pages_used == 0: return False invalid_ratio = invalid_pages / (total_pages_used + invalid_pages) return invalid_ratio >= self.ftl_data["gc_threshold"] def _garbage_collect(self): """ Perform garbage collection to reclaim invalid pages. This is a simplified implementation. """ with self.lock: print("Starting garbage collection...") # Get blocks with invalid pages invalid_pages = self.ftl_data["invalid_pages"] if not invalid_pages: print("No invalid pages to collect") return # Group invalid pages by block invalid_blocks = {} for ppn in invalid_pages: block_num = ppn // self.virtual_flash.pages_per_block if block_num not in invalid_blocks: invalid_blocks[block_num] = [] invalid_blocks[block_num].append(ppn) # For each block with significant invalid pages, consider erasing blocks_erased = 0 for block_num, block_invalid_pages in invalid_blocks.items(): # If more than 50% of block is invalid, erase it if len(block_invalid_pages) > self.virtual_flash.pages_per_block * 0.5: # Move valid pages to new locations first self._relocate_valid_pages_in_block(block_num) # Erase the block if self.virtual_flash.erase_block(block_num): blocks_erased += 1 # Remove invalid pages from tracking for ppn in block_invalid_pages: if ppn in self.ftl_data["invalid_pages"]: self.ftl_data["invalid_pages"].remove(ppn) self.ftl_data["gc_count"] += 1 print(f"Garbage collection complete: {blocks_erased} blocks erased") def _relocate_valid_pages_in_block(self, block_num: int): """ Relocate valid pages in a block before erasing it. """ start_page = block_num * self.virtual_flash.pages_per_block end_page = start_page + self.virtual_flash.pages_per_block for ppn in range(start_page, end_page): ppn_str = str(ppn) if ppn_str in self.ftl_data["ppn_to_lba"]: # This page is valid and needs to be relocated lba = self.ftl_data["ppn_to_lba"][ppn_str] # Read data from old location data = self.virtual_flash.read_page(ppn) if data: # Allocate new physical page new_ppn = self._allocate_physical_page() # Write data to new location if self.virtual_flash.write_page(new_ppn, data): # Update FTL mapping lba_str = str(lba) self.ftl_data["lba_to_ppn"][lba_str] = new_ppn self.ftl_data["ppn_to_lba"][str(new_ppn)] = lba # Remove old mapping del self.ftl_data["ppn_to_lba"][ppn_str] def trim_logical_block(self, lba: int): """ Mark a logical block as unused (TRIM command). """ with self.lock: lba_str = str(lba) ppn = self.ftl_data["lba_to_ppn"].get(lba_str) if ppn is not None: # Remove mapping del self.ftl_data["lba_to_ppn"][lba_str] ppn_str = str(ppn) if ppn_str in self.ftl_data["ppn_to_lba"]: del self.ftl_data["ppn_to_lba"][ppn_str] # Mark page as invalid self.ftl_data["invalid_pages"].append(ppn) def get_ftl_stats(self) -> Dict: """ Get FTL statistics. """ with self.lock: mapped_lbas = len(self.ftl_data["lba_to_ppn"]) invalid_pages = len(self.ftl_data["invalid_pages"]) return { "logical_blocks": self.logical_blocks, "mapped_lbas": mapped_lbas, "unmapped_lbas": self.logical_blocks - mapped_lbas, "invalid_pages": invalid_pages, "write_count": self.ftl_data["write_count"], "gc_count": self.ftl_data["gc_count"], "next_physical_page": self.next_physical_page } def get_mapping_table(self) -> Dict: """ Get the current LBA to PPN mapping table (for debugging). """ with self.lock: return self.ftl_data["lba_to_ppn"].copy() def force_garbage_collection(self): """ Force garbage collection to run. """ self._garbage_collect() def export_snapshot(self) -> str: """ Exports the current state of the FTL as a base64 encoded JSON string. """ with self.lock: snapshot_data = { "ftl_data": self.ftl_data, "next_physical_page": self.next_physical_page, "logical_blocks": self.logical_blocks, "pages_per_logical_block": self.pages_per_logical_block } return base64.b64encode(json.dumps(snapshot_data).encode("utf-8")).decode("utf-8") def mount_snapshot(self, snapshot_json_or_url: str): """ Loads the state of the FTL from a base64 encoded JSON string. """ with self.lock: try: decoded_data = base64.b64decode(snapshot_json_or_url).decode("utf-8") snapshot_data = json.loads(decoded_data) self.ftl_data = snapshot_data["ftl_data"] self.next_physical_page = snapshot_data["next_physical_page"] self.logical_blocks = snapshot_data.get("logical_blocks", self.logical_blocks) self.pages_per_logical_block = snapshot_data.get("pages_per_logical_block", self.pages_per_logical_block) print("SSDController state loaded from snapshot.") except Exception as e: print(f"Error loading FTL snapshot: {e}") raise def shutdown(self): """ No specific shutdown needed for in-memory FTL, as state is not persisted to disk automatically. """ print("SSDController shutdown complete") def __del__(self): try: self.shutdown() except: pass def export_snapshot(self) -> str: """ Exports the current state of the FTL as a base64 encoded JSON string. """ with self.lock: snapshot_data = { "ftl_data": self.ftl_data, "next_physical_page": self.next_physical_page, "logical_blocks": self.logical_blocks, "pages_per_logical_block": self.pages_per_logical_block } return base64.b64encode(json.dumps(snapshot_data).encode("utf-8")).decode("utf-8") def mount_snapshot(self, snapshot_json_or_url: str): """ Loads the state of the FTL from a base64 encoded JSON string. """ with self.lock: try: decoded_data = base64.b64decode(snapshot_json_or_url).decode("utf-8") snapshot_data = json.loads(decoded_data) self.ftl_data = snapshot_data["ftl_data"] self.next_physical_page = snapshot_data["next_physical_page"] self.logical_blocks = snapshot_data.get("logical_blocks", self.logical_blocks) self.pages_per_logical_block = snapshot_data.get("pages_per_logical_block", self.pages_per_logical_block) print("SSDController state loaded from snapshot.") except Exception as e: print(f"Error loading FTL snapshot: {e}") raise