Spaces:
Runtime error
Runtime error
| """ | |
| Virtual OS Module - Handles file system operations, memory allocation, and buffering | |
| """ | |
| import threading | |
| import time | |
| from typing import Dict, Optional | |
| class VirtualOS: | |
| """ | |
| Simulates OS-level operations including file system management, | |
| memory allocation, and write buffering. | |
| """ | |
| def __init__(self, file_system_map, virtual_driver, virtual_ram_buffer): | |
| self.file_system = file_system_map | |
| self.driver = virtual_driver | |
| self.ram_buffer = virtual_ram_buffer | |
| self.lock = threading.RLock() | |
| # Auto-flush settings | |
| self.auto_flush_enabled = True | |
| self.flush_interval = 5.0 # Flush every 5 seconds | |
| self.last_flush_time = time.time() | |
| print("VirtualOS initialized") | |
| def write_file(self, filename: str, data: bytes) -> bool: | |
| """ | |
| Write data to a file. Handles file creation, block allocation, | |
| and data writing through the driver. | |
| """ | |
| with self.lock: | |
| try: | |
| # Check if file exists | |
| file_info = self.file_system.get_file_info(filename) | |
| if file_info is None: | |
| # Create new file | |
| inode = self.file_system.create_file(filename, len(data)) | |
| if inode is None: | |
| print(f"Failed to create file '{filename}'") | |
| return False | |
| file_info = self.file_system.get_file_info(filename) | |
| else: | |
| # Update existing file size | |
| if not self.file_system.update_file_size(filename, len(data)): | |
| print(f"Failed to update file size for '{filename}'") | |
| return False | |
| file_info = self.file_system.get_file_info(filename) | |
| # Get allocated blocks for the file | |
| blocks = file_info["blocks"] | |
| # Split data into blocks and write | |
| block_size = self.driver.block_size | |
| bytes_written = 0 | |
| for i, logical_block in enumerate(blocks): | |
| start_offset = i * block_size | |
| end_offset = min(start_offset + block_size, len(data)) | |
| if start_offset >= len(data): | |
| # No more data to write, fill with zeros | |
| block_data = b'\x00' * block_size | |
| else: | |
| block_data = data[start_offset:end_offset] | |
| # Use buffered write if enabled | |
| if self._should_use_buffer(): | |
| if not self.ram_buffer.write_to_buffer(logical_block, block_data): | |
| # Buffer full, write directly | |
| if not self.driver.send_write(logical_block, block_data): | |
| print(f"Failed to write block {logical_block} for file '{filename}'") | |
| return False | |
| else: | |
| # Write directly to driver | |
| if not self.driver.send_write(logical_block, block_data): | |
| print(f"Failed to write block {logical_block} for file '{filename}'") | |
| return False | |
| bytes_written += len(block_data) | |
| # Check if auto-flush is needed | |
| if self._should_auto_flush(): | |
| self.flush_buffers() | |
| print(f"File '{filename}' written successfully ({bytes_written} bytes)") | |
| return True | |
| except Exception as e: | |
| print(f"Error writing file '{filename}': {e}") | |
| return False | |
| def read_file(self, filename: str) -> Optional[bytes]: | |
| """ | |
| Read data from a file. | |
| """ | |
| with self.lock: | |
| try: | |
| file_info = self.file_system.get_file_info(filename) | |
| if file_info is None: | |
| print(f"File '{filename}' not found") | |
| return None | |
| blocks = file_info["blocks"] | |
| file_size = file_info["size"] | |
| # Read data from blocks | |
| data_parts = [] | |
| bytes_read = 0 | |
| for logical_block in blocks: | |
| # Check buffer first | |
| block_data = self.ram_buffer.read_from_buffer(logical_block) | |
| if block_data is None: | |
| # Not in buffer, read from driver | |
| block_data = self.driver.send_read(logical_block) | |
| if block_data is None: | |
| print(f"Failed to read block {logical_block} for file '{filename}'") | |
| return None | |
| data_parts.append(block_data) | |
| bytes_read += len(block_data) | |
| # Combine all blocks and trim to actual file size | |
| full_data = b''.join(data_parts) | |
| return full_data[:file_size] | |
| except Exception as e: | |
| print(f"Error reading file '{filename}': {e}") | |
| return None | |
| def delete_file(self, filename: str) -> bool: | |
| """ | |
| Delete a file and free its blocks. | |
| """ | |
| with self.lock: | |
| try: | |
| file_info = self.file_system.get_file_info(filename) | |
| if file_info is None: | |
| print(f"File '{filename}' not found") | |
| return False | |
| blocks = file_info["blocks"] | |
| # Send TRIM commands for all blocks | |
| for logical_block in blocks: | |
| self.driver.send_trim(logical_block) | |
| # Remove from file system | |
| return self.file_system.delete_file(filename) | |
| except Exception as e: | |
| print(f"Error deleting file '{filename}': {e}") | |
| return False | |
| def list_files(self) -> Dict: | |
| """ | |
| List all files in the file system. | |
| """ | |
| return self.file_system.list_files() | |
| def get_file_info(self, filename: str) -> Optional[Dict]: | |
| """ | |
| Get information about a specific file. | |
| """ | |
| return self.file_system.get_file_info(filename) | |
| def flush_buffers(self): | |
| """ | |
| Force flush all buffered data to storage. | |
| """ | |
| with self.lock: | |
| print("Flushing OS buffers...") | |
| buffered_data = self.ram_buffer.flush_buffer() | |
| for logical_block, data in buffered_data.items(): | |
| if not self.driver.send_write(logical_block, data): | |
| print(f"Failed to flush block {logical_block}") | |
| self.last_flush_time = time.time() | |
| print("OS buffer flush complete") | |
| def _should_use_buffer(self) -> bool: | |
| """ | |
| Determine if writes should be buffered. | |
| """ | |
| return True # Always use buffer for now | |
| def _should_auto_flush(self) -> bool: | |
| """ | |
| Determine if auto-flush should be triggered. | |
| """ | |
| if not self.auto_flush_enabled: | |
| return False | |
| return (time.time() - self.last_flush_time) >= self.flush_interval | |
| def get_os_stats(self) -> Dict: | |
| """ | |
| Get OS-level statistics. | |
| """ | |
| fs_stats = self.file_system.get_usage_stats() | |
| buffer_stats = self.ram_buffer.get_buffer_status() | |
| driver_stats = self.driver.get_driver_stats() | |
| return { | |
| "file_system": fs_stats, | |
| "ram_buffer": buffer_stats, | |
| "driver": driver_stats | |
| } | |
| def shutdown(self): | |
| """ | |
| Shutdown the OS and flush any remaining data. | |
| """ | |
| print("VirtualOS shutting down...") | |
| self.flush_buffers() | |
| print("VirtualOS shutdown complete") | |
| def __del__(self): | |
| try: | |
| self.shutdown() | |
| except: | |
| pass | |