""" Virtual Flash Module - Simulates NAND Flash behavior """ import os import json import threading import time import random import base64 from typing import Dict, Optional, Tuple from builtins import open class VirtualFlash: """ Simulates NAND Flash behavior with pages, blocks, and wear leveling. Stores data entirely in-memory and provides snapshot functionality for persistence. """ def __init__(self, capacity_gb: int = 2048, page_size: int = 4096, pages_per_block: int = 256): self.capacity_gb = capacity_gb self.page_size = page_size # 4KB pages self.pages_per_block = pages_per_block # 256 pages per block self.block_size = page_size * pages_per_block # 1MB blocks # Calculate total pages and blocks self.total_bytes = capacity_gb * 1024 * 1024 * 1024 self.total_pages = self.total_bytes // page_size self.total_blocks = self.total_pages // pages_per_block # In-memory storage for flash pages: {page_number: data_bytes} self.pages: Dict[int, bytes] = {} # In-memory storage for block metadata: {block_number: {'erase_count': int, 'bad_block': bool, 'last_erase_time': float}} self.block_metadata: Dict[int, Dict] = {} # Global metadata self.metadata = { "total_writes": 0, "total_erases": 0, "bad_blocks": [], "wear_leveling_threshold": 1000, "power_cycles": 0 } # Thread lock for thread safety self.lock = threading.RLock() print(f"VirtualFlash initialized: {capacity_gb}GB, {self.total_pages:,} pages, {self.total_blocks:,} blocks") def get_page_address(self, page_number: int) -> Tuple[int, int]: """Convert page number to block and page within block.""" block_number = page_number // self.pages_per_block page_in_block = page_number % self.pages_per_block return block_number, page_in_block def get_page_number(self, block_number: int, page_in_block: int) -> int: """Convert block and page within block to page number.""" return block_number * self.pages_per_block + page_in_block def write_page(self, page_number: int, data: bytes) -> bool: """ Write data to a specific page. Simulates NAND flash write behavior. """ if page_number >= self.total_pages: raise ValueError(f"Page number {page_number} exceeds capacity") if len(data) > self.page_size: raise ValueError(f"Data size {len(data)} exceeds page size {self.page_size}") # Pad data to page size if len(data) < self.page_size: data = data + b'\x00' * (self.page_size - len(data)) block_number, page_in_block = self.get_page_address(page_number) with self.lock: # Check if block is bad if block_number in self.metadata.get("bad_blocks", []): return False # Simulate write delay time.sleep(0.0001) # 0.1ms write delay # Update page data self.pages[page_number] = data # Update block metadata (initialize if not exists) if block_number not in self.block_metadata: self.block_metadata[block_number] = {'erase_count': 0, 'bad_block': False, 'last_erase_time': time.time()} # Update global metadata self.metadata["total_writes"] += 1 # Simulate wear (very small chance of block going bad) if random.random() < 0.000001: # 1 in 1,000,000 chance self.metadata["bad_blocks"].append(block_number) self.block_metadata[block_number]['bad_block'] = True print(f"Block {block_number} marked as bad due to wear") return True def read_page(self, page_number: int) -> Optional[bytes]: """ Read data from a specific page. Returns None if page doesn't exist or is invalid. """ if page_number >= self.total_pages: raise ValueError(f"Page number {page_number} exceeds capacity") with self.lock: # Simulate read delay time.sleep(0.00005) # 0.05ms read delay # Check if block is bad block_number, _ = self.get_page_address(page_number) if self.block_metadata.get(block_number, {}).get('bad_block', False): return b'\x00' * self.page_size # Return empty for bad blocks return self.pages.get(page_number, b'\x00' * self.page_size) def erase_block(self, block_number: int) -> bool: """ Erase an entire block (mark all pages as invalid). Simulates block erase operation. """ if block_number >= self.total_blocks: raise ValueError(f"Block number {block_number} exceeds capacity") with self.lock: # Check if block is bad if self.block_metadata.get(block_number, {}).get('bad_block', False): return False # Simulate erase delay (much longer than write) time.sleep(0.002) # 2ms erase delay # Remove pages belonging to this block pages_to_remove = [p_num for p_num in self.pages if self.get_page_address(p_num)[0] == block_number] for p_num in pages_to_remove: del self.pages[p_num] # Update block metadata if block_number not in self.block_metadata: self.block_metadata[block_number] = {'erase_count': 0, 'bad_block': False, 'last_erase_time': time.time()} self.block_metadata[block_number]['erase_count'] += 1 self.block_metadata[block_number]['last_erase_time'] = time.time() # Update global metadata self.metadata["total_erases"] += 1 return True def get_block_info(self, block_number: int) -> Dict: """Get information about a specific block.""" with self.lock: block_meta = self.block_metadata.get(block_number, {'erase_count': 0, 'bad_block': False, 'last_erase_time': 0}) valid_pages = sum(1 for p_num in self.pages if self.get_page_address(p_num)[0] == block_number) return { "block_number": block_number, "erase_count": block_meta['erase_count'], "is_bad": block_meta['bad_block'], "last_erase_time": block_meta['last_erase_time'], "valid_pages": valid_pages, "total_pages": self.pages_per_block } def get_flash_stats(self) -> Dict: """Get overall flash statistics.""" with self.lock: written_pages = len(self.pages) used_blocks = len(set(self.get_page_address(p_num)[0] for p_num in self.pages)) used_bytes = written_pages * self.page_size usage_percent = (used_bytes / self.total_bytes) * 100 if self.total_bytes > 0 else 0 return { "capacity_gb": self.capacity_gb, "total_bytes": self.total_bytes, "used_bytes": used_bytes, "free_bytes": self.total_bytes - used_bytes, "usage_percent": usage_percent, "total_pages": self.total_pages, "written_pages": written_pages, "free_pages": self.total_pages - written_pages, "total_blocks": self.total_blocks, "used_blocks": used_blocks, "free_blocks": self.total_blocks - used_blocks, "bad_blocks": len(self.metadata.get("bad_blocks", [])), "total_erases": self.metadata.get("total_erases", 0) } def export_snapshot(self) -> str: """ Exports the current state of the virtual flash as a base64 encoded JSON string. """ with self.lock: snapshot_data = { "pages": {str(k): base64.b64encode(v).decode("utf-8") for k, v in self.pages.items()}, "block_metadata": self.block_metadata, "metadata": self.metadata, "capacity_gb": self.capacity_gb, "page_size": self.page_size, "pages_per_block": self.pages_per_block } return base64.b64encode(json.dumps(snapshot_data).encode("utf-8")).decode("utf-8") def mount_snapshot(self, snapshot_json_or_url: str): """ Loads the state of the virtual flash from a base64 encoded JSON string. """ with self.lock: try: decoded_data = base64.b64decode(snapshot_json_or_url).decode("utf-8") snapshot_data = json.loads(decoded_data) self.capacity_gb = snapshot_data.get("capacity_gb", self.capacity_gb) self.page_size = snapshot_data.get("page_size", self.page_size) self.pages_per_block = snapshot_data.get("pages_per_block", self.pages_per_block) self.total_bytes = self.capacity_gb * 1024 * 1024 * 1024 self.total_pages = self.total_bytes // self.page_size self.total_blocks = self.total_pages // self.pages_per_block self.pages = {int(k): base64.b64decode(v) for k, v in snapshot_data["pages"].items()} self.block_metadata = snapshot_data["block_metadata"] self.metadata = snapshot_data["metadata"] print("VirtualFlash state loaded from snapshot.") except Exception as e: print(f"Error loading snapshot: {e}") raise def shutdown(self): """ No specific shutdown needed for in-memory flash, as state is not persisted to disk automatically. """ print("VirtualFlash shutdown complete") def __del__(self): """ Destructor to ensure proper cleanup. """ try: self.shutdown() except: pass