Factor Studios
Upload 167 files
684cc60 verified
"""
Virtual Flash Module - Simulates NAND Flash behavior
"""
import os
import json
import threading
import time
import random
import base64
from typing import Dict, Optional, Tuple
from builtins import open
class VirtualFlash:
"""
Simulates NAND Flash behavior with pages, blocks, and wear leveling.
Stores data entirely in-memory and provides snapshot functionality for persistence.
"""
def __init__(self, capacity_gb: int = 2048, page_size: int = 4096, pages_per_block: int = 256):
self.capacity_gb = capacity_gb
self.page_size = page_size # 4KB pages
self.pages_per_block = pages_per_block # 256 pages per block
self.block_size = page_size * pages_per_block # 1MB blocks
# Calculate total pages and blocks
self.total_bytes = capacity_gb * 1024 * 1024 * 1024
self.total_pages = self.total_bytes // page_size
self.total_blocks = self.total_pages // pages_per_block
# In-memory storage for flash pages: {page_number: data_bytes}
self.pages: Dict[int, bytes] = {}
# In-memory storage for block metadata: {block_number: {'erase_count': int, 'bad_block': bool, 'last_erase_time': float}}
self.block_metadata: Dict[int, Dict] = {}
# Global metadata
self.metadata = {
"total_writes": 0,
"total_erases": 0,
"bad_blocks": [],
"wear_leveling_threshold": 1000,
"power_cycles": 0
}
# Thread lock for thread safety
self.lock = threading.RLock()
print(f"VirtualFlash initialized: {capacity_gb}GB, {self.total_pages:,} pages, {self.total_blocks:,} blocks")
def get_page_address(self, page_number: int) -> Tuple[int, int]:
"""Convert page number to block and page within block."""
block_number = page_number // self.pages_per_block
page_in_block = page_number % self.pages_per_block
return block_number, page_in_block
def get_page_number(self, block_number: int, page_in_block: int) -> int:
"""Convert block and page within block to page number."""
return block_number * self.pages_per_block + page_in_block
def write_page(self, page_number: int, data: bytes) -> bool:
"""
Write data to a specific page.
Simulates NAND flash write behavior.
"""
if page_number >= self.total_pages:
raise ValueError(f"Page number {page_number} exceeds capacity")
if len(data) > self.page_size:
raise ValueError(f"Data size {len(data)} exceeds page size {self.page_size}")
# Pad data to page size
if len(data) < self.page_size:
data = data + b'\x00' * (self.page_size - len(data))
block_number, page_in_block = self.get_page_address(page_number)
with self.lock:
# Check if block is bad
if block_number in self.metadata.get("bad_blocks", []):
return False
# Simulate write delay
time.sleep(0.0001) # 0.1ms write delay
# Update page data
self.pages[page_number] = data
# Update block metadata (initialize if not exists)
if block_number not in self.block_metadata:
self.block_metadata[block_number] = {'erase_count': 0, 'bad_block': False, 'last_erase_time': time.time()}
# Update global metadata
self.metadata["total_writes"] += 1
# Simulate wear (very small chance of block going bad)
if random.random() < 0.000001: # 1 in 1,000,000 chance
self.metadata["bad_blocks"].append(block_number)
self.block_metadata[block_number]['bad_block'] = True
print(f"Block {block_number} marked as bad due to wear")
return True
def read_page(self, page_number: int) -> Optional[bytes]:
"""
Read data from a specific page.
Returns None if page doesn't exist or is invalid.
"""
if page_number >= self.total_pages:
raise ValueError(f"Page number {page_number} exceeds capacity")
with self.lock:
# Simulate read delay
time.sleep(0.00005) # 0.05ms read delay
# Check if block is bad
block_number, _ = self.get_page_address(page_number)
if self.block_metadata.get(block_number, {}).get('bad_block', False):
return b'\x00' * self.page_size # Return empty for bad blocks
return self.pages.get(page_number, b'\x00' * self.page_size)
def erase_block(self, block_number: int) -> bool:
"""
Erase an entire block (mark all pages as invalid).
Simulates block erase operation.
"""
if block_number >= self.total_blocks:
raise ValueError(f"Block number {block_number} exceeds capacity")
with self.lock:
# Check if block is bad
if self.block_metadata.get(block_number, {}).get('bad_block', False):
return False
# Simulate erase delay (much longer than write)
time.sleep(0.002) # 2ms erase delay
# Remove pages belonging to this block
pages_to_remove = [p_num for p_num in self.pages if self.get_page_address(p_num)[0] == block_number]
for p_num in pages_to_remove:
del self.pages[p_num]
# Update block metadata
if block_number not in self.block_metadata:
self.block_metadata[block_number] = {'erase_count': 0, 'bad_block': False, 'last_erase_time': time.time()}
self.block_metadata[block_number]['erase_count'] += 1
self.block_metadata[block_number]['last_erase_time'] = time.time()
# Update global metadata
self.metadata["total_erases"] += 1
return True
def get_block_info(self, block_number: int) -> Dict:
"""Get information about a specific block."""
with self.lock:
block_meta = self.block_metadata.get(block_number, {'erase_count': 0, 'bad_block': False, 'last_erase_time': 0})
valid_pages = sum(1 for p_num in self.pages if self.get_page_address(p_num)[0] == block_number)
return {
"block_number": block_number,
"erase_count": block_meta['erase_count'],
"is_bad": block_meta['bad_block'],
"last_erase_time": block_meta['last_erase_time'],
"valid_pages": valid_pages,
"total_pages": self.pages_per_block
}
def get_flash_stats(self) -> Dict:
"""Get overall flash statistics."""
with self.lock:
written_pages = len(self.pages)
used_blocks = len(set(self.get_page_address(p_num)[0] for p_num in self.pages))
used_bytes = written_pages * self.page_size
usage_percent = (used_bytes / self.total_bytes) * 100 if self.total_bytes > 0 else 0
return {
"capacity_gb": self.capacity_gb,
"total_bytes": self.total_bytes,
"used_bytes": used_bytes,
"free_bytes": self.total_bytes - used_bytes,
"usage_percent": usage_percent,
"total_pages": self.total_pages,
"written_pages": written_pages,
"free_pages": self.total_pages - written_pages,
"total_blocks": self.total_blocks,
"used_blocks": used_blocks,
"free_blocks": self.total_blocks - used_blocks,
"bad_blocks": len(self.metadata.get("bad_blocks", [])),
"total_erases": self.metadata.get("total_erases", 0)
}
def export_snapshot(self) -> str:
"""
Exports the current state of the virtual flash as a base64 encoded JSON string.
"""
with self.lock:
snapshot_data = {
"pages": {str(k): base64.b64encode(v).decode("utf-8") for k, v in self.pages.items()},
"block_metadata": self.block_metadata,
"metadata": self.metadata,
"capacity_gb": self.capacity_gb,
"page_size": self.page_size,
"pages_per_block": self.pages_per_block
}
return base64.b64encode(json.dumps(snapshot_data).encode("utf-8")).decode("utf-8")
def mount_snapshot(self, snapshot_json_or_url: str):
"""
Loads the state of the virtual flash from a base64 encoded JSON string.
"""
with self.lock:
try:
decoded_data = base64.b64decode(snapshot_json_or_url).decode("utf-8")
snapshot_data = json.loads(decoded_data)
self.capacity_gb = snapshot_data.get("capacity_gb", self.capacity_gb)
self.page_size = snapshot_data.get("page_size", self.page_size)
self.pages_per_block = snapshot_data.get("pages_per_block", self.pages_per_block)
self.total_bytes = self.capacity_gb * 1024 * 1024 * 1024
self.total_pages = self.total_bytes // self.page_size
self.total_blocks = self.total_pages // self.pages_per_block
self.pages = {int(k): base64.b64decode(v) for k, v in snapshot_data["pages"].items()}
self.block_metadata = snapshot_data["block_metadata"]
self.metadata = snapshot_data["metadata"]
print("VirtualFlash state loaded from snapshot.")
except Exception as e:
print(f"Error loading snapshot: {e}")
raise
def shutdown(self):
"""
No specific shutdown needed for in-memory flash, as state is not persisted to disk automatically.
"""
print("VirtualFlash shutdown complete")
def __del__(self):
"""
Destructor to ensure proper cleanup.
"""
try:
self.shutdown()
except:
pass