Spaces:
Runtime error
Runtime error
| """ | |
| Virtual SSD Main Module - Integrates all components | |
| """ | |
| import os | |
| import shutil | |
| import json | |
| import base64 | |
| from typing import Optional, Dict | |
| from builtins import open | |
| from .virtual_flash import VirtualFlash | |
| from .file_system_map import FileSystemMap | |
| from .ssd_controller import SSDController | |
| from .virtual_ram_buffer import VirtualRAMBuffer | |
| from .virtual_driver import VirtualDriver | |
| from .virtual_os import VirtualOS | |
| from .app_interface import AppInterface | |
| from .persistent_virtual_disk import PersistentVirtualDisk | |
| from .volatile_virtual_disk import VolatileVirtualDisk | |
| class VirtualSSD: | |
| """ | |
| Main class to integrate all virtual SSD components. | |
| Provides mount, shutdown, and application-level file operations. | |
| """ | |
| def __init__(self, capacity_gb: int = 2048, page_size: int = 4096, pages_per_block: int = 256): | |
| self.capacity_gb = capacity_gb | |
| self.page_size = page_size | |
| self.pages_per_block = pages_per_block | |
| # Calculate total logical blocks based on capacity and page size | |
| # Assuming 1 logical block = 1 page for simplicity in initial FTL | |
| self.total_logical_blocks = (capacity_gb * 1024 * 1024 * 1024) // page_size | |
| self.flash: Optional[VirtualFlash] = None | |
| self.ram_buffer: Optional[VirtualRAMBuffer] = None | |
| self.controller: Optional[SSDController] = None | |
| self.file_system: Optional[FileSystemMap] = None | |
| self.driver: Optional[VirtualDriver] = None | |
| self.os: Optional[VirtualOS] = None | |
| self.app_interface: Optional[AppInterface] = None | |
| self.mounted = False | |
| # Persistent Virtual Disk | |
| self.pvd = PersistentVirtualDisk(capacity_gb, page_size, pages_per_block) | |
| self.vvd: Optional[VolatileVirtualDisk] = None | |
| print("VirtualSSD instance created.") | |
| def mount(self): | |
| """ | |
| Mount the virtual SSD, initializing all components. | |
| """ | |
| if self.mounted: | |
| print("Virtual SSD already mounted.") | |
| return | |
| print("Mounting Virtual SSD...") | |
| # Ensure storage directory exists for snapshot | |
| os.makedirs("virtual_ssd_data", exist_ok=True) | |
| try: | |
| # Load PVD state | |
| pvd_snapshot_path = os.path.join("virtual_ssd_data", "pvd_snapshot.json") | |
| pvd_snapshot_data = None | |
| if os.path.exists(pvd_snapshot_path): | |
| with open(pvd_snapshot_path, 'r') as f: | |
| pvd_snapshot_data = json.load(f) | |
| self.pvd.load_from_storage(pvd_snapshot_data['pvd_state']) | |
| else: | |
| self.pvd.load_from_storage() # Initialize fresh state | |
| self.flash = self.pvd.get_flash() | |
| self.controller = self.pvd.get_controller() | |
| self.file_system = self.pvd.get_file_system() | |
| self.vvd = VolatileVirtualDisk(self.flash, self.file_system, self.controller) | |
| self.ram_buffer = VirtualRAMBuffer(capacity_bytes=128 * 1024 * 1024) # 128MB buffer | |
| self.driver = VirtualDriver(self.vvd, self.page_size) # Driver interacts with VVD | |
| self.os = VirtualOS(self.file_system, self.driver, self.ram_buffer) | |
| self.app_interface = AppInterface(self.os) | |
| self.mounted = True | |
| print("Virtual SSD mounted successfully.") | |
| except Exception as e: | |
| print(f"Error mounting Virtual SSD: {e}") | |
| self.shutdown() # Attempt to clean up | |
| self.mounted = False | |
| def shutdown(self): | |
| """ | |
| Shutdown the virtual SSD, saving all states to a snapshot. | |
| """ | |
| if not self.mounted: | |
| print("Virtual SSD not mounted.") | |
| return | |
| print("Shutting down Virtual SSD...") | |
| try: | |
| if self.app_interface: | |
| self.app_interface.sync() # Ensure all data is flushed | |
| if self.vvd: | |
| self.vvd.shutdown() # Flush dirty pages to PVD | |
| if self.os: | |
| self.os.shutdown() | |
| if self.driver: | |
| self.driver.shutdown() | |
| # Save PVD state | |
| os.makedirs("virtual_ssd_data", exist_ok=True) # Ensure directory exists | |
| pvd_snapshot_path = os.path.join("virtual_ssd_data", "pvd_snapshot.json") | |
| pvd_state = self.pvd.save_to_storage() | |
| with open(pvd_snapshot_path, 'w') as f: | |
| json.dump({"pvd_state": pvd_state}, f, indent=2) | |
| print(f"PVD state saved to snapshot: {pvd_snapshot_path}") | |
| self.pvd.shutdown() | |
| self.mounted = False | |
| print("Virtual SSD shutdown complete.") | |
| except Exception as e: | |
| print(f"Error during Virtual SSD shutdown: {e}") | |
| def save_file(self, filename: str, data: bytes) -> bool: | |
| """ | |
| Save a file to the virtual SSD. | |
| """ | |
| if not self.mounted or not self.app_interface: | |
| print("Error: Virtual SSD not mounted.") | |
| return False | |
| return self.app_interface.save(filename, data) | |
| def read_file(self, filename: str) -> Optional[bytes]: | |
| """ | |
| Read a file from the virtual SSD. | |
| """ | |
| if not self.mounted or not self.app_interface: | |
| print("Error: Virtual SSD not mounted.") | |
| return None | |
| return self.app_interface.load(filename) | |
| def delete_file(self, filename: str) -> bool: | |
| """ | |
| Delete a file from the virtual SSD. | |
| """ | |
| if not self.mounted or not self.app_interface: | |
| print("Error: Virtual SSD not mounted.") | |
| return False | |
| return self.app_interface.delete(filename) | |
| def list_files(self) -> Dict: | |
| """ | |
| List all files and their metadata on the virtual SSD. | |
| """ | |
| if not self.mounted or not self.app_interface: | |
| print("Error: Virtual SSD not mounted.") | |
| return {} | |
| return self.app_interface.list_files() | |
| def get_capacity_info(self) -> Dict: | |
| """ | |
| Get capacity information of the virtual SSD. | |
| """ | |
| if not self.mounted or not self.app_interface: | |
| print("Error: Virtual SSD not mounted.") | |
| return {} | |
| return self.app_interface.get_capacity_info() | |
| def get_full_stats(self) -> Dict: | |
| """ | |
| Get comprehensive statistics from all layers. | |
| """ | |
| if not self.mounted or not self.app_interface or not self.os or not self.flash or not self.controller: | |
| print("Error: Virtual SSD not mounted or components missing.") | |
| return {} | |
| return { | |
| "flash_stats": self.flash.get_flash_stats(), | |
| "ftl_stats": self.controller.get_ftl_stats(), | |
| "file_system_stats": self.file_system.get_usage_stats(), | |
| "ram_buffer_stats": self.ram_buffer.get_buffer_status() | |
| } | |
| def format_ssd(self): | |
| """ | |
| Formats the virtual SSD, deleting all data and resetting state. | |
| """ | |
| if self.mounted: | |
| self.shutdown() | |
| print("Formatting Virtual SSD...") | |
| storage_dir = "virtual_ssd_data" | |
| if os.path.exists(storage_dir): | |
| shutil.rmtree(storage_dir) | |
| print(f"Removed existing storage directory: {storage_dir}") | |
| self.pvd = PersistentVirtualDisk(self.capacity_gb, self.page_size, self.pages_per_block) | |
| self.pvd.format_disk() | |
| self.flash = self.pvd.get_flash() | |
| self.controller = self.pvd.get_controller() | |
| self.file_system = self.pvd.get_file_system() | |
| self.ram_buffer = VirtualRAMBuffer(capacity_bytes=128 * 1024 * 1024) | |
| self.driver = VirtualDriver(self.controller, self.page_size) | |
| self.os = VirtualOS(self.file_system, self.driver, self.ram_buffer) | |
| self.app_interface = AppInterface(self.os) | |
| self.mounted = True | |
| print("Virtual SSD formatted and re-mounted.") | |
| def __del__(self): | |
| """ | |
| Ensure shutdown is called on object deletion. | |
| """ | |
| try: | |
| if self.mounted: | |
| self.shutdown() | |
| except: | |
| pass | |
| if __name__ == "__main__": | |
| # Example Usage | |
| ssd = VirtualSSD(capacity_gb=2) # Create a 2GB virtual SSD for testing | |
| ssd.mount() | |
| # Test file operations | |
| test_data_small = b"Hello, this is a small test file." * 10 # 330 bytes | |
| test_data_large = b"This is a larger test file content." * 1000 # 35KB | |
| test_data_very_large = b"A very large file for testing purposes. " * 100000 # ~4MB | |
| print("\n--- Testing file saves ---") | |
| ssd.save_file("small_file.txt", test_data_small) | |
| ssd.save_file("large_file.bin", test_data_large) | |
| ssd.save_file("video.mp4", test_data_very_large) | |
| ssd.save_file("another_file.txt", b"Some more data.") | |
| # Upload the created test file | |
| with open("/home/ubuntu/test_upload_file.txt", "rb") as f: | |
| uploaded_data = f.read() | |
| ssd.save_file("uploaded_test_file.txt", uploaded_data) | |
| print("Uploaded test_upload_file.txt to virtual SSD.") | |
| print("\n--- Listing files ---") | |
| files = ssd.list_files() | |
| for filename, info in files.items(): | |
| print(f"File: {filename}, Size: {info['size']} bytes, Blocks: {len(info['blocks'])}") | |
| print("\n--- Checking capacity info ---") | |
| capacity_info = ssd.get_capacity_info() | |
| print(f"Total: {capacity_info['total_gb']} GB, Used: {capacity_info['used_gb']} GB, Free: {capacity_info['free_gb']} GB, Usage: {capacity_info['usage_percent']:.2f}%") | |
| print("\n--- Reading files ---") | |
| read_small_data = ssd.read_file("small_file.txt") | |
| print(f"Read small_file.txt: {read_small_data[:50]}...") | |
| assert read_small_data == test_data_small | |
| read_large_data = ssd.read_file("large_file.bin") | |
| print(f"Read large_file.bin: {read_large_data[:50]}...") | |
| assert read_large_data == test_data_large | |
| read_video_data = ssd.read_file("video.mp4") | |
| print(f"Read video.mp4: {read_video_data[:50]}...") | |
| assert read_video_data == test_data_very_large | |
| read_uploaded_data = ssd.read_file("uploaded_test_file.txt") | |
| print(f"Read uploaded_test_file.txt: {read_uploaded_data[:50]}...") | |
| assert read_uploaded_data == uploaded_data | |
| print("\n--- Deleting a file ---") | |
| ssd.delete_file("large_file.bin") | |
| print("\n--- Listing files after deletion ---") | |
| files = ssd.list_files() | |
| for filename, info in files.items(): | |
| print(f"File: {filename}, Size: {info['size']} bytes, Blocks: {len(info['blocks'])}") | |
| print("\n--- Checking capacity info after deletion ---") | |
| capacity_info = ssd.get_capacity_info() | |
| print(f"Total: {capacity_info['total_gb']} GB, Used: {capacity_info['used_gb']} GB, Free: {capacity_info['free_gb']} GB, Usage: {capacity_info['usage_percent']:.2f}%") | |
| print("\n--- Testing persistence ---") | |
| ssd.shutdown() | |
| print("SSD shut down. Re-mounting to check persistence.") | |
| ssd_reloaded = VirtualSSD(capacity_gb=2) # Same capacity | |
| ssd_reloaded.mount() | |
| print("\n--- Listing files after re-mount ---") | |
| files_reloaded = ssd_reloaded.list_files() | |
| for filename, info in files_reloaded.items(): | |
| print(f"File: {filename}, Size: {info['size']} bytes, Blocks: {len(info['blocks'])}") | |
| read_small_data_reloaded = ssd_reloaded.read_file("small_file.txt") | |
| assert read_small_data_reloaded == test_data_small | |
| print("small_file.txt read successfully after remount.") | |
| read_uploaded_data_reloaded = ssd_reloaded.read_file("uploaded_test_file.txt") | |
| assert read_uploaded_data_reloaded == uploaded_data | |
| print("uploaded_test_file.txt read successfully after remount.") | |
| # Test formatting | |
| print("\n--- Testing format ---") | |
| ssd_reloaded.format_ssd() | |
| print("\n--- Listing files after format ---") | |
| files_after_format = ssd_reloaded.list_files() | |
| print(f"Files after format: {files_after_format}") | |
| assert not files_after_format | |
| ssd_reloaded.shutdown() | |
| print("All tests complete.") | |