Spaces:
Runtime error
Runtime error
| """ | |
| Virtual Flash Module - Simulates NAND Flash behavior | |
| """ | |
| import os | |
| import json | |
| import threading | |
| import time | |
| import random | |
| import base64 | |
| from typing import Dict, Optional, Tuple | |
| from builtins import open | |
| class VirtualFlash: | |
| """ | |
| Simulates NAND Flash behavior with pages, blocks, and wear leveling. | |
| Stores data entirely in-memory and provides snapshot functionality for persistence. | |
| """ | |
| def __init__(self, capacity_gb: int = 2048, page_size: int = 4096, pages_per_block: int = 256): | |
| self.capacity_gb = capacity_gb | |
| self.page_size = page_size # 4KB pages | |
| self.pages_per_block = pages_per_block # 256 pages per block | |
| self.block_size = page_size * pages_per_block # 1MB blocks | |
| # Calculate total pages and blocks | |
| self.total_bytes = capacity_gb * 1024 * 1024 * 1024 | |
| self.total_pages = self.total_bytes // page_size | |
| self.total_blocks = self.total_pages // pages_per_block | |
| # In-memory storage for flash pages: {page_number: data_bytes} | |
| self.pages: Dict[int, bytes] = {} | |
| # In-memory storage for block metadata: {block_number: {'erase_count': int, 'bad_block': bool, 'last_erase_time': float}} | |
| self.block_metadata: Dict[int, Dict] = {} | |
| # Global metadata | |
| self.metadata = { | |
| "total_writes": 0, | |
| "total_erases": 0, | |
| "bad_blocks": [], | |
| "wear_leveling_threshold": 1000, | |
| "power_cycles": 0 | |
| } | |
| # Thread lock for thread safety | |
| self.lock = threading.RLock() | |
| print(f"VirtualFlash initialized: {capacity_gb}GB, {self.total_pages:,} pages, {self.total_blocks:,} blocks") | |
| def get_page_address(self, page_number: int) -> Tuple[int, int]: | |
| """Convert page number to block and page within block.""" | |
| block_number = page_number // self.pages_per_block | |
| page_in_block = page_number % self.pages_per_block | |
| return block_number, page_in_block | |
| def get_page_number(self, block_number: int, page_in_block: int) -> int: | |
| """Convert block and page within block to page number.""" | |
| return block_number * self.pages_per_block + page_in_block | |
| def write_page(self, page_number: int, data: bytes) -> bool: | |
| """ | |
| Write data to a specific page. | |
| Simulates NAND flash write behavior. | |
| """ | |
| if page_number >= self.total_pages: | |
| raise ValueError(f"Page number {page_number} exceeds capacity") | |
| if len(data) > self.page_size: | |
| raise ValueError(f"Data size {len(data)} exceeds page size {self.page_size}") | |
| # Pad data to page size | |
| if len(data) < self.page_size: | |
| data = data + b'\x00' * (self.page_size - len(data)) | |
| block_number, page_in_block = self.get_page_address(page_number) | |
| with self.lock: | |
| # Check if block is bad | |
| if block_number in self.metadata.get("bad_blocks", []): | |
| return False | |
| # Simulate write delay | |
| time.sleep(0.0001) # 0.1ms write delay | |
| # Update page data | |
| self.pages[page_number] = data | |
| # Update block metadata (initialize if not exists) | |
| if block_number not in self.block_metadata: | |
| self.block_metadata[block_number] = {'erase_count': 0, 'bad_block': False, 'last_erase_time': time.time()} | |
| # Update global metadata | |
| self.metadata["total_writes"] += 1 | |
| # Simulate wear (very small chance of block going bad) | |
| if random.random() < 0.000001: # 1 in 1,000,000 chance | |
| self.metadata["bad_blocks"].append(block_number) | |
| self.block_metadata[block_number]['bad_block'] = True | |
| print(f"Block {block_number} marked as bad due to wear") | |
| return True | |
| def read_page(self, page_number: int) -> Optional[bytes]: | |
| """ | |
| Read data from a specific page. | |
| Returns None if page doesn't exist or is invalid. | |
| """ | |
| if page_number >= self.total_pages: | |
| raise ValueError(f"Page number {page_number} exceeds capacity") | |
| with self.lock: | |
| # Simulate read delay | |
| time.sleep(0.00005) # 0.05ms read delay | |
| # Check if block is bad | |
| block_number, _ = self.get_page_address(page_number) | |
| if self.block_metadata.get(block_number, {}).get('bad_block', False): | |
| return b'\x00' * self.page_size # Return empty for bad blocks | |
| return self.pages.get(page_number, b'\x00' * self.page_size) | |
| def erase_block(self, block_number: int) -> bool: | |
| """ | |
| Erase an entire block (mark all pages as invalid). | |
| Simulates block erase operation. | |
| """ | |
| if block_number >= self.total_blocks: | |
| raise ValueError(f"Block number {block_number} exceeds capacity") | |
| with self.lock: | |
| # Check if block is bad | |
| if self.block_metadata.get(block_number, {}).get('bad_block', False): | |
| return False | |
| # Simulate erase delay (much longer than write) | |
| time.sleep(0.002) # 2ms erase delay | |
| # Remove pages belonging to this block | |
| pages_to_remove = [p_num for p_num in self.pages if self.get_page_address(p_num)[0] == block_number] | |
| for p_num in pages_to_remove: | |
| del self.pages[p_num] | |
| # Update block metadata | |
| if block_number not in self.block_metadata: | |
| self.block_metadata[block_number] = {'erase_count': 0, 'bad_block': False, 'last_erase_time': time.time()} | |
| self.block_metadata[block_number]['erase_count'] += 1 | |
| self.block_metadata[block_number]['last_erase_time'] = time.time() | |
| # Update global metadata | |
| self.metadata["total_erases"] += 1 | |
| return True | |
| def get_block_info(self, block_number: int) -> Dict: | |
| """Get information about a specific block.""" | |
| with self.lock: | |
| block_meta = self.block_metadata.get(block_number, {'erase_count': 0, 'bad_block': False, 'last_erase_time': 0}) | |
| valid_pages = sum(1 for p_num in self.pages if self.get_page_address(p_num)[0] == block_number) | |
| return { | |
| "block_number": block_number, | |
| "erase_count": block_meta['erase_count'], | |
| "is_bad": block_meta['bad_block'], | |
| "last_erase_time": block_meta['last_erase_time'], | |
| "valid_pages": valid_pages, | |
| "total_pages": self.pages_per_block | |
| } | |
| def get_flash_stats(self) -> Dict: | |
| """Get overall flash statistics.""" | |
| with self.lock: | |
| written_pages = len(self.pages) | |
| used_blocks = len(set(self.get_page_address(p_num)[0] for p_num in self.pages)) | |
| used_bytes = written_pages * self.page_size | |
| usage_percent = (used_bytes / self.total_bytes) * 100 if self.total_bytes > 0 else 0 | |
| return { | |
| "capacity_gb": self.capacity_gb, | |
| "total_bytes": self.total_bytes, | |
| "used_bytes": used_bytes, | |
| "free_bytes": self.total_bytes - used_bytes, | |
| "usage_percent": usage_percent, | |
| "total_pages": self.total_pages, | |
| "written_pages": written_pages, | |
| "free_pages": self.total_pages - written_pages, | |
| "total_blocks": self.total_blocks, | |
| "used_blocks": used_blocks, | |
| "free_blocks": self.total_blocks - used_blocks, | |
| "bad_blocks": len(self.metadata.get("bad_blocks", [])), | |
| "total_erases": self.metadata.get("total_erases", 0) | |
| } | |
| def export_snapshot(self) -> str: | |
| """ | |
| Exports the current state of the virtual flash as a base64 encoded JSON string. | |
| """ | |
| with self.lock: | |
| snapshot_data = { | |
| "pages": {str(k): base64.b64encode(v).decode("utf-8") for k, v in self.pages.items()}, | |
| "block_metadata": self.block_metadata, | |
| "metadata": self.metadata, | |
| "capacity_gb": self.capacity_gb, | |
| "page_size": self.page_size, | |
| "pages_per_block": self.pages_per_block | |
| } | |
| return base64.b64encode(json.dumps(snapshot_data).encode("utf-8")).decode("utf-8") | |
| def mount_snapshot(self, snapshot_json_or_url: str): | |
| """ | |
| Loads the state of the virtual flash from a base64 encoded JSON string. | |
| """ | |
| with self.lock: | |
| try: | |
| decoded_data = base64.b64decode(snapshot_json_or_url).decode("utf-8") | |
| snapshot_data = json.loads(decoded_data) | |
| self.capacity_gb = snapshot_data.get("capacity_gb", self.capacity_gb) | |
| self.page_size = snapshot_data.get("page_size", self.page_size) | |
| self.pages_per_block = snapshot_data.get("pages_per_block", self.pages_per_block) | |
| self.total_bytes = self.capacity_gb * 1024 * 1024 * 1024 | |
| self.total_pages = self.total_bytes // self.page_size | |
| self.total_blocks = self.total_pages // self.pages_per_block | |
| self.pages = {int(k): base64.b64decode(v) for k, v in snapshot_data["pages"].items()} | |
| self.block_metadata = snapshot_data["block_metadata"] | |
| self.metadata = snapshot_data["metadata"] | |
| print("VirtualFlash state loaded from snapshot.") | |
| except Exception as e: | |
| print(f"Error loading snapshot: {e}") | |
| raise | |
| def shutdown(self): | |
| """ | |
| No specific shutdown needed for in-memory flash, as state is not persisted to disk automatically. | |
| """ | |
| print("VirtualFlash shutdown complete") | |
| def __del__(self): | |
| """ | |
| Destructor to ensure proper cleanup. | |
| """ | |
| try: | |
| self.shutdown() | |
| except: | |
| pass | |