Spaces:
Runtime error
Runtime error
| """ | |
| SSD Controller Module - Implements Flash Translation Layer (FTL) | |
| """ | |
| import json | |
| import os | |
| import time | |
| import threading | |
| import random | |
| import base64 | |
| from typing import Dict, List, Optional, Tuple | |
| class SSDController: | |
| """ | |
| Implements Flash Translation Layer (FTL) functionality. | |
| Maps Logical Block Addresses (LBA) to Physical Page Numbers (PPN). | |
| Handles wear leveling and garbage collection. | |
| Stores FTL data entirely in-memory and provides snapshot functionality for persistence. | |
| """ | |
| def __init__(self, virtual_flash, logical_blocks: int, pages_per_logical_block: int = 1): | |
| self.virtual_flash = virtual_flash | |
| self.logical_blocks = logical_blocks | |
| self.pages_per_logical_block = pages_per_logical_block | |
| self.lock = threading.RLock() | |
| # In-memory FTL mapping table | |
| self.ftl_data = { | |
| "lba_to_ppn": {}, # Logical Block Address to Physical Page Number | |
| "ppn_to_lba": {}, # Physical Page Number to Logical Block Address (reverse mapping) | |
| "invalid_pages": [], # List of invalid physical pages | |
| "wear_leveling_threshold": 100, | |
| "gc_threshold": 0.8, # Garbage collection threshold (80% full) | |
| "write_count": 0, | |
| "gc_count": 0 | |
| } | |
| # Track next available physical page for allocation | |
| self.next_physical_page = self._find_next_physical_page() | |
| print(f"SSDController initialized: {logical_blocks} logical blocks, FTL mapping in-memory") | |
| def _find_next_physical_page(self) -> int: | |
| """Find the next available physical page for allocation.""" | |
| used_pages = set() | |
| for ppn_str in self.ftl_data["lba_to_ppn"].values(): | |
| if isinstance(ppn_str, list): | |
| used_pages.update(ppn_str) | |
| else: | |
| used_pages.add(ppn_str) | |
| # Find first unused page | |
| page = 0 | |
| while page in used_pages: | |
| page += 1 | |
| return page | |
| def _allocate_physical_page(self) -> int: | |
| """ | |
| Allocate a new physical page. | |
| This is a simplified allocation that just increments the next_physical_page counter. | |
| In a real FTL, this would involve finding free pages in erase blocks. | |
| """ | |
| with self.lock: | |
| # Find next available page | |
| # This simplified logic assumes continuous allocation and doesn't reuse freed pages efficiently | |
| # A more advanced FTL would manage free page lists within blocks. | |
| allocated_page = self.next_physical_page | |
| self.next_physical_page += 1 | |
| return allocated_page | |
| def write_logical_block(self, lba: int, data: bytes) -> bool: | |
| """ | |
| Write data to a logical block address. | |
| Handles FTL mapping and wear leveling. | |
| """ | |
| if lba >= self.logical_blocks: | |
| raise ValueError(f"LBA {lba} exceeds logical capacity") | |
| with self.lock: | |
| lba_str = str(lba) | |
| # Check if LBA already has a mapping | |
| old_ppn = self.ftl_data["lba_to_ppn"].get(lba_str) | |
| # Allocate new physical page | |
| new_ppn = self._allocate_physical_page() | |
| # Write data to physical page | |
| success = self.virtual_flash.write_page(new_ppn, data) | |
| if not success: | |
| print(f"Failed to write to physical page {new_ppn}") | |
| return False | |
| # Update FTL mapping | |
| self.ftl_data["lba_to_ppn"][lba_str] = new_ppn | |
| self.ftl_data["ppn_to_lba"][str(new_ppn)] = lba | |
| # Mark old page as invalid if it exists | |
| if old_ppn is not None: | |
| old_ppn_str = str(old_ppn) | |
| if old_ppn_str in self.ftl_data["ppn_to_lba"]: | |
| del self.ftl_data["ppn_to_lba"][old_ppn_str] | |
| self.ftl_data["invalid_pages"].append(old_ppn) | |
| self.ftl_data["write_count"] += 1 | |
| # Check if garbage collection is needed | |
| if self._should_trigger_gc(): | |
| self._garbage_collect() | |
| return True | |
| def read_logical_block(self, lba: int) -> Optional[bytes]: | |
| """ | |
| Read data from a logical block address. | |
| """ | |
| if lba >= self.logical_blocks: | |
| raise ValueError(f"LBA {lba} exceeds logical capacity") | |
| with self.lock: | |
| lba_str = str(lba) | |
| ppn = self.ftl_data["lba_to_ppn"].get(lba_str) | |
| if ppn is None: | |
| # LBA not mapped, return empty data | |
| return b'\x00' * self.virtual_flash.page_size | |
| return self.virtual_flash.read_page(ppn) | |
| def _should_trigger_gc(self) -> bool: | |
| """ | |
| Determine if garbage collection should be triggered. | |
| """ | |
| total_pages_used = len(self.ftl_data["lba_to_ppn"]) | |
| invalid_pages = len(self.ftl_data["invalid_pages"]) | |
| if total_pages_used == 0: | |
| return False | |
| invalid_ratio = invalid_pages / (total_pages_used + invalid_pages) | |
| return invalid_ratio >= self.ftl_data["gc_threshold"] | |
| def _garbage_collect(self): | |
| """ | |
| Perform garbage collection to reclaim invalid pages. | |
| This is a simplified implementation. | |
| """ | |
| with self.lock: | |
| print("Starting garbage collection...") | |
| # Get blocks with invalid pages | |
| invalid_pages = self.ftl_data["invalid_pages"] | |
| if not invalid_pages: | |
| print("No invalid pages to collect") | |
| return | |
| # Group invalid pages by block | |
| invalid_blocks = {} | |
| for ppn in invalid_pages: | |
| block_num = ppn // self.virtual_flash.pages_per_block | |
| if block_num not in invalid_blocks: | |
| invalid_blocks[block_num] = [] | |
| invalid_blocks[block_num].append(ppn) | |
| # For each block with significant invalid pages, consider erasing | |
| blocks_erased = 0 | |
| for block_num, block_invalid_pages in invalid_blocks.items(): | |
| # If more than 50% of block is invalid, erase it | |
| if len(block_invalid_pages) > self.virtual_flash.pages_per_block * 0.5: | |
| # Move valid pages to new locations first | |
| self._relocate_valid_pages_in_block(block_num) | |
| # Erase the block | |
| if self.virtual_flash.erase_block(block_num): | |
| blocks_erased += 1 | |
| # Remove invalid pages from tracking | |
| for ppn in block_invalid_pages: | |
| if ppn in self.ftl_data["invalid_pages"]: | |
| self.ftl_data["invalid_pages"].remove(ppn) | |
| self.ftl_data["gc_count"] += 1 | |
| print(f"Garbage collection complete: {blocks_erased} blocks erased") | |
| def _relocate_valid_pages_in_block(self, block_num: int): | |
| """ | |
| Relocate valid pages in a block before erasing it. | |
| """ | |
| start_page = block_num * self.virtual_flash.pages_per_block | |
| end_page = start_page + self.virtual_flash.pages_per_block | |
| for ppn in range(start_page, end_page): | |
| ppn_str = str(ppn) | |
| if ppn_str in self.ftl_data["ppn_to_lba"]: | |
| # This page is valid and needs to be relocated | |
| lba = self.ftl_data["ppn_to_lba"][ppn_str] | |
| # Read data from old location | |
| data = self.virtual_flash.read_page(ppn) | |
| if data: | |
| # Allocate new physical page | |
| new_ppn = self._allocate_physical_page() | |
| # Write data to new location | |
| if self.virtual_flash.write_page(new_ppn, data): | |
| # Update FTL mapping | |
| lba_str = str(lba) | |
| self.ftl_data["lba_to_ppn"][lba_str] = new_ppn | |
| self.ftl_data["ppn_to_lba"][str(new_ppn)] = lba | |
| # Remove old mapping | |
| del self.ftl_data["ppn_to_lba"][ppn_str] | |
| def trim_logical_block(self, lba: int): | |
| """ | |
| Mark a logical block as unused (TRIM command). | |
| """ | |
| with self.lock: | |
| lba_str = str(lba) | |
| ppn = self.ftl_data["lba_to_ppn"].get(lba_str) | |
| if ppn is not None: | |
| # Remove mapping | |
| del self.ftl_data["lba_to_ppn"][lba_str] | |
| ppn_str = str(ppn) | |
| if ppn_str in self.ftl_data["ppn_to_lba"]: | |
| del self.ftl_data["ppn_to_lba"][ppn_str] | |
| # Mark page as invalid | |
| self.ftl_data["invalid_pages"].append(ppn) | |
| def get_ftl_stats(self) -> Dict: | |
| """ | |
| Get FTL statistics. | |
| """ | |
| with self.lock: | |
| mapped_lbas = len(self.ftl_data["lba_to_ppn"]) | |
| invalid_pages = len(self.ftl_data["invalid_pages"]) | |
| return { | |
| "logical_blocks": self.logical_blocks, | |
| "mapped_lbas": mapped_lbas, | |
| "unmapped_lbas": self.logical_blocks - mapped_lbas, | |
| "invalid_pages": invalid_pages, | |
| "write_count": self.ftl_data["write_count"], | |
| "gc_count": self.ftl_data["gc_count"], | |
| "next_physical_page": self.next_physical_page | |
| } | |
| def get_mapping_table(self) -> Dict: | |
| """ | |
| Get the current LBA to PPN mapping table (for debugging). | |
| """ | |
| with self.lock: | |
| return self.ftl_data["lba_to_ppn"].copy() | |
| def force_garbage_collection(self): | |
| """ | |
| Force garbage collection to run. | |
| """ | |
| self._garbage_collect() | |
| def export_snapshot(self) -> str: | |
| """ | |
| Exports the current state of the FTL as a base64 encoded JSON string. | |
| """ | |
| with self.lock: | |
| snapshot_data = { | |
| "ftl_data": self.ftl_data, | |
| "next_physical_page": self.next_physical_page, | |
| "logical_blocks": self.logical_blocks, | |
| "pages_per_logical_block": self.pages_per_logical_block | |
| } | |
| return base64.b64encode(json.dumps(snapshot_data).encode("utf-8")).decode("utf-8") | |
| def mount_snapshot(self, snapshot_json_or_url: str): | |
| """ | |
| Loads the state of the FTL from a base64 encoded JSON string. | |
| """ | |
| with self.lock: | |
| try: | |
| decoded_data = base64.b64decode(snapshot_json_or_url).decode("utf-8") | |
| snapshot_data = json.loads(decoded_data) | |
| self.ftl_data = snapshot_data["ftl_data"] | |
| self.next_physical_page = snapshot_data["next_physical_page"] | |
| self.logical_blocks = snapshot_data.get("logical_blocks", self.logical_blocks) | |
| self.pages_per_logical_block = snapshot_data.get("pages_per_logical_block", self.pages_per_logical_block) | |
| print("SSDController state loaded from snapshot.") | |
| except Exception as e: | |
| print(f"Error loading FTL snapshot: {e}") | |
| raise | |
| def shutdown(self): | |
| """ | |
| No specific shutdown needed for in-memory FTL, as state is not persisted to disk automatically. | |
| """ | |
| print("SSDController shutdown complete") | |
| def __del__(self): | |
| try: | |
| self.shutdown() | |
| except: | |
| pass | |
| def export_snapshot(self) -> str: | |
| """ | |
| Exports the current state of the FTL as a base64 encoded JSON string. | |
| """ | |
| with self.lock: | |
| snapshot_data = { | |
| "ftl_data": self.ftl_data, | |
| "next_physical_page": self.next_physical_page, | |
| "logical_blocks": self.logical_blocks, | |
| "pages_per_logical_block": self.pages_per_logical_block | |
| } | |
| return base64.b64encode(json.dumps(snapshot_data).encode("utf-8")).decode("utf-8") | |
| def mount_snapshot(self, snapshot_json_or_url: str): | |
| """ | |
| Loads the state of the FTL from a base64 encoded JSON string. | |
| """ | |
| with self.lock: | |
| try: | |
| decoded_data = base64.b64decode(snapshot_json_or_url).decode("utf-8") | |
| snapshot_data = json.loads(decoded_data) | |
| self.ftl_data = snapshot_data["ftl_data"] | |
| self.next_physical_page = snapshot_data["next_physical_page"] | |
| self.logical_blocks = snapshot_data.get("logical_blocks", self.logical_blocks) | |
| self.pages_per_logical_block = snapshot_data.get("pages_per_logical_block", self.pages_per_logical_block) | |
| print("SSDController state loaded from snapshot.") | |
| except Exception as e: | |
| print(f"Error loading FTL snapshot: {e}") | |
| raise | |